mynameborat commented on a change in pull request #1442:
URL: https://github.com/apache/samza/pull/1442#discussion_r528730829
##########
File path:
samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
##########
@@ -142,7 +143,14 @@ private static void run(
listener = new ClusterBasedProcessorLifecycleListener(config,
processorLifecycleListener, container::shutdown);
container.setContainerListener(listener);
- ContainerHeartbeatMonitor heartbeatMonitor =
createContainerHeartbeatMonitor(container);
+ JobConfig jobConfig = new JobConfig(config);
+ boolean isJobCoordinatorHighAvailabilityEnabled =
jobConfig.getJobCoordinatorHighAvailabilityEnabled();
+ long retryCount =
jobConfig.getJobCoordinatorDynamicHeartbeatRetryCount();
+ long sleepDurationForReconnectWithAM =
jobConfig.getJobCoordinatorHeartbeatReconnectSleepDurationWithAmMs();
Review comment:
nit: inline instead as variables aren't used after and the getters carry
explicit intent of what is being passed as arguments.
##########
File path:
samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
##########
@@ -189,9 +197,13 @@ private static void run(
/**
* Creates a new container heartbeat monitor if possible.
* @param container the container to monitor
+ * @param coordinatorStreamStore the metadata store to fetch coordinator url
from
+ * @param isJobCoordinatorHighAvailabilityEnabled whether coordinator HA is
enabled to fetch new coordinator url
Review comment:
nit: missing docs for the newly added parameters.
##########
File path:
samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -93,6 +103,79 @@ public void testDoesNotCallbackWhenHeartbeatAlive() throws
InterruptedException
verify(this.scheduler).shutdown();
}
+ @Test
+ public void testReestablishConnectionWithNewAM() throws InterruptedException
{
+ String containerExecutionId = "0";
+ String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+ this.containerHeartbeatMonitor =
+ spy(new ContainerHeartbeatMonitor(this.onExpired,
this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+ containerExecutionId, coordinatorStreamStore, true, 5, 10));
+ CoordinatorStreamValueSerde serde = new
CoordinatorStreamValueSerde(SetConfig.TYPE);
+ ContainerHeartbeatResponse response1 = new
ContainerHeartbeatResponse(false);
+ ContainerHeartbeatResponse response2 = new
ContainerHeartbeatResponse(true);
+
when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(response1).thenReturn(response2);
+
when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl,
containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+
when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+
+ this.containerHeartbeatMonitor.start();
+ // wait for the executor to finish the heartbeat check task
+ boolean fixedRateTaskCompleted =
this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
Review comment:
we should instead pass our own mocked executor which can run the task
that is passed to it and decrement latch. Here is an example -
https://github.com/apache/samza/pull/1334
##########
File path: docs/learn/documentation/versioned/jobs/samza-configurations.md
##########
@@ -324,6 +324,9 @@ Samza supports both standalone and clustered
([YARN](yarn-jobs.html)) [deploymen
|job.container.count|1|The number of YARN containers to request for running
your job. This is the main parameter for controlling the scale (allocated
computing resources) of your job: to increase the parallelism of processing,
you need to increase the number of containers. The minimum is one container,
and the maximum number of containers is the number of task instances (usually
the number of input stream partitions). Task instances are evenly distributed
across the number of containers that you specify.|
|cluster-manager.container.memory.mb|1024|How much memory, in megabytes, to
request from the cluster manager per container of your job. Along with
cluster-manager.container.cpu.cores, this property determines how many
containers the cluster manager will run on one machine. If the container
exceeds this limit, it will be killed, so it is important that the container's
actual memory use remains below the limit. The amount of memory used is
normally the JVM heap size (configured with task.opts), plus the size of any
off-heap memory allocation (for example stores.*.container.cache.size.bytes),
plus a safety margin to allow for JVM overheads.|
|cluster-manager.container.cpu.cores|1|The number of CPU cores to request per
container of your job. Each node in the cluster has a certain number of CPU
cores available, so this number (along with
cluster-manager.container.memory.mb) determines how many containers can be run
on one machine.|
+|job.coordinator.high-availability.enabled|false|If true, enables Job
Coordinator (AM) high availability (HA) where a new AM can establish connection
with already running containers.
+|job.coordinator.dynamic-heartbeat.retry.count|5|If AM-HA is enabled, when a
running container loses heartbeat with AM, this count gives the number of times
an already running container will attempt to establish heartbeat with new AM|
+|job.coordinator.dynamic-heartbeat.reconnect-sleep-duration.ms|10000|If AM-HA
is enabled, when a running container loses heartbeat with AM, this duration
gives the amount of time a running container will sleep between attempts to
establish heartbeat with new AM.|
Review comment:
Alluding to same thread in the refactor PR, we should keep these
configurations as YARN configuration.
I'd recommend to remove dynamic out of the configuration name as well. e.g.
`yarn.container.heartbeat.retry.count` and
`yarn.container.heartbeat.retry-sleep-duration.ms` or something similar.
##########
File path: docs/learn/documentation/versioned/jobs/samza-configurations.md
##########
@@ -324,6 +324,9 @@ Samza supports both standalone and clustered
([YARN](yarn-jobs.html)) [deploymen
|job.container.count|1|The number of YARN containers to request for running
your job. This is the main parameter for controlling the scale (allocated
computing resources) of your job: to increase the parallelism of processing,
you need to increase the number of containers. The minimum is one container,
and the maximum number of containers is the number of task instances (usually
the number of input stream partitions). Task instances are evenly distributed
across the number of containers that you specify.|
|cluster-manager.container.memory.mb|1024|How much memory, in megabytes, to
request from the cluster manager per container of your job. Along with
cluster-manager.container.cpu.cores, this property determines how many
containers the cluster manager will run on one machine. If the container
exceeds this limit, it will be killed, so it is important that the container's
actual memory use remains below the limit. The amount of memory used is
normally the JVM heap size (configured with task.opts), plus the size of any
off-heap memory allocation (for example stores.*.container.cache.size.bytes),
plus a safety margin to allow for JVM overheads.|
|cluster-manager.container.cpu.cores|1|The number of CPU cores to request per
container of your job. Each node in the cluster has a certain number of CPU
cores available, so this number (along with
cluster-manager.container.memory.mb) determines how many containers can be run
on one machine.|
+|job.coordinator.high-availability.enabled|false|If true, enables Job
Coordinator (AM) high availability (HA) where a new AM can establish connection
with already running containers.
Review comment:
I am leaning towards having this under the `YARN` namespace as well
since this is very specific to how we can achieve availability in YARN w.r.t AM
going down. For e.g. in ZK model, this doesn't mean much.
IIRC, in long run we want to abstract container (expose only
`StreamProcessor`) and keep it as an internal construct.
@prateekm can you chime in your opinion here?
##########
File path:
samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -93,6 +103,79 @@ public void testDoesNotCallbackWhenHeartbeatAlive() throws
InterruptedException
verify(this.scheduler).shutdown();
}
+ @Test
+ public void testReestablishConnectionWithNewAM() throws InterruptedException
{
+ String containerExecutionId = "0";
+ String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+ this.containerHeartbeatMonitor =
+ spy(new ContainerHeartbeatMonitor(this.onExpired,
this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+ containerExecutionId, coordinatorStreamStore, true, 5, 10));
+ CoordinatorStreamValueSerde serde = new
CoordinatorStreamValueSerde(SetConfig.TYPE);
+ ContainerHeartbeatResponse response1 = new
ContainerHeartbeatResponse(false);
+ ContainerHeartbeatResponse response2 = new
ContainerHeartbeatResponse(true);
Review comment:
Can we extract these to class level constants?
Perhaps, name them `FAILURE_RESPONSE` and `SUCCESS_RESPONSE` to be explicit.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]