mynameborat commented on a change in pull request #1442:
URL: https://github.com/apache/samza/pull/1442#discussion_r528730829



##########
File path: 
samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
##########
@@ -142,7 +143,14 @@ private static void run(
           listener = new ClusterBasedProcessorLifecycleListener(config, 
processorLifecycleListener, container::shutdown);
       container.setContainerListener(listener);
 
-      ContainerHeartbeatMonitor heartbeatMonitor = 
createContainerHeartbeatMonitor(container);
+      JobConfig jobConfig = new JobConfig(config);
+      boolean isJobCoordinatorHighAvailabilityEnabled = 
jobConfig.getJobCoordinatorHighAvailabilityEnabled();
+      long retryCount = 
jobConfig.getJobCoordinatorDynamicHeartbeatRetryCount();
+      long sleepDurationForReconnectWithAM = 
jobConfig.getJobCoordinatorHeartbeatReconnectSleepDurationWithAmMs();

Review comment:
       nit: inline instead as variables aren't used after and the getters carry 
explicit intent of what is being passed as arguments.

##########
File path: 
samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
##########
@@ -189,9 +197,13 @@ private static void run(
   /**
    * Creates a new container heartbeat monitor if possible.
    * @param container the container to monitor
+   * @param coordinatorStreamStore the metadata store to fetch coordinator url 
from
+   * @param isJobCoordinatorHighAvailabilityEnabled whether coordinator HA is 
enabled to fetch new coordinator url

Review comment:
       nit: missing docs for the newly added parameters.

##########
File path: 
samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -93,6 +103,79 @@ public void testDoesNotCallbackWhenHeartbeatAlive() throws 
InterruptedException
     verify(this.scheduler).shutdown();
   }
 
+  @Test
+  public void testReestablishConnectionWithNewAM() throws InterruptedException 
{
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";;
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, 
this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new 
CoordinatorStreamValueSerde(SetConfig.TYPE);
+    ContainerHeartbeatResponse response1 = new 
ContainerHeartbeatResponse(false);
+    ContainerHeartbeatResponse response2 = new 
ContainerHeartbeatResponse(true);
+    
when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(response1).thenReturn(response2);
+    
when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl,
 containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    
when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = 
this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);

Review comment:
       we should instead pass our own mocked executor which can run the task 
that is passed to it and decrement latch. Here is an example - 
https://github.com/apache/samza/pull/1334

##########
File path: docs/learn/documentation/versioned/jobs/samza-configurations.md
##########
@@ -324,6 +324,9 @@ Samza supports both standalone and clustered 
([YARN](yarn-jobs.html)) [deploymen
 |job.container.count|1|The number of YARN containers to request for running 
your job. This is the main parameter for controlling the scale (allocated 
computing resources) of your job: to increase the parallelism of processing, 
you need to increase the number of containers. The minimum is one container, 
and the maximum number of containers is the number of task instances (usually 
the number of input stream partitions). Task instances are evenly distributed 
across the number of containers that you specify.|
 |cluster-manager.container.memory.mb|1024|How much memory, in megabytes, to 
request from the cluster manager per container of your job. Along with 
cluster-manager.container.cpu.cores, this property determines how many 
containers the cluster manager will run on one machine. If the container 
exceeds this limit, it will be killed, so it is important that the container's 
actual memory use remains below the limit. The amount of memory used is 
normally the JVM heap size (configured with task.opts), plus the size of any 
off-heap memory allocation (for example stores.*.container.cache.size.bytes), 
plus a safety margin to allow for JVM overheads.|
 |cluster-manager.container.cpu.cores|1|The number of CPU cores to request per 
container of your job. Each node in the cluster has a certain number of CPU 
cores available, so this number (along with 
cluster-manager.container.memory.mb) determines how many containers can be run 
on one machine.|
+|job.coordinator.high-availability.enabled|false|If true, enables Job 
Coordinator (AM) high availability (HA) where a new AM can establish connection 
with already running containers.  
+|job.coordinator.dynamic-heartbeat.retry.count|5|If AM-HA is enabled, when a 
running container loses heartbeat with AM, this count gives the number of times 
an already running container will attempt to establish heartbeat with new AM|
+|job.coordinator.dynamic-heartbeat.reconnect-sleep-duration.ms|10000|If AM-HA 
is enabled, when a running container loses heartbeat with AM, this duration 
gives the amount of time a running container will sleep between attempts to 
establish heartbeat with new AM.|

Review comment:
       Alluding to same thread in the refactor PR, we should keep these 
configurations as YARN configuration.
   I'd recommend to remove dynamic out of the configuration name as well. e.g. 
`yarn.container.heartbeat.retry.count` and 
`yarn.container.heartbeat.retry-sleep-duration.ms` or something similar.

##########
File path: docs/learn/documentation/versioned/jobs/samza-configurations.md
##########
@@ -324,6 +324,9 @@ Samza supports both standalone and clustered 
([YARN](yarn-jobs.html)) [deploymen
 |job.container.count|1|The number of YARN containers to request for running 
your job. This is the main parameter for controlling the scale (allocated 
computing resources) of your job: to increase the parallelism of processing, 
you need to increase the number of containers. The minimum is one container, 
and the maximum number of containers is the number of task instances (usually 
the number of input stream partitions). Task instances are evenly distributed 
across the number of containers that you specify.|
 |cluster-manager.container.memory.mb|1024|How much memory, in megabytes, to 
request from the cluster manager per container of your job. Along with 
cluster-manager.container.cpu.cores, this property determines how many 
containers the cluster manager will run on one machine. If the container 
exceeds this limit, it will be killed, so it is important that the container's 
actual memory use remains below the limit. The amount of memory used is 
normally the JVM heap size (configured with task.opts), plus the size of any 
off-heap memory allocation (for example stores.*.container.cache.size.bytes), 
plus a safety margin to allow for JVM overheads.|
 |cluster-manager.container.cpu.cores|1|The number of CPU cores to request per 
container of your job. Each node in the cluster has a certain number of CPU 
cores available, so this number (along with 
cluster-manager.container.memory.mb) determines how many containers can be run 
on one machine.|
+|job.coordinator.high-availability.enabled|false|If true, enables Job 
Coordinator (AM) high availability (HA) where a new AM can establish connection 
with already running containers.  

Review comment:
       I am leaning towards having this under the `YARN` namespace as well 
since this is very specific to how we can achieve availability in YARN w.r.t AM 
going down. For e.g. in ZK model, this doesn't mean much. 
   IIRC, in long run we want to abstract container (expose only 
`StreamProcessor`) and keep it as an internal construct. 
   @prateekm  can you chime in your opinion here?

##########
File path: 
samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -93,6 +103,79 @@ public void testDoesNotCallbackWhenHeartbeatAlive() throws 
InterruptedException
     verify(this.scheduler).shutdown();
   }
 
+  @Test
+  public void testReestablishConnectionWithNewAM() throws InterruptedException 
{
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";;
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, 
this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new 
CoordinatorStreamValueSerde(SetConfig.TYPE);
+    ContainerHeartbeatResponse response1 = new 
ContainerHeartbeatResponse(false);
+    ContainerHeartbeatResponse response2 = new 
ContainerHeartbeatResponse(true);

Review comment:
       Can we extract these to class level constants?
   Perhaps, name them `FAILURE_RESPONSE` and `SUCCESS_RESPONSE` to be explicit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to