mynameborat commented on a change in pull request #1442:
URL: https://github.com/apache/samza/pull/1442#discussion_r527005009



##########
File path: samza-core/src/main/java/org/apache/samza/config/JobConfig.java
##########
@@ -147,6 +147,11 @@
 
   private static final String JOB_STARTPOINT_ENABLED = 
"job.startpoint.enabled";
 
+  // Enable ClusterBasedJobCoordinator aka ApplicationMaster High Availability.
+  // High availability allows new AM to establish connection with already 
running containers
+  public static final String JOB_COORDINATOR_HIGH_AVAILABILITY_ENABLED = 
"job.coordinator.high-availability.enabled";

Review comment:
       Can we update the configurable table in docs?

##########
File path: 
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -34,26 +38,45 @@
 public class ContainerHeartbeatMonitor {
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
   private static final ThreadFactory THREAD_FACTORY = new 
HeartbeatThreadFactory();
+  private static final CoordinatorStreamValueSerde SERDE = new 
CoordinatorStreamValueSerde(SetConfig.TYPE);
+  private static final int RETRY_COUNT = 5;

Review comment:
       Should this be configurable? This is tandem with the 
`sleepDurationForReconnectWithAM` determines how long we want to container to 
retry establishing connection with AM.
   
   That said, it also means thats how long you can potentially have an 
overlapping containers (duplicate container) if AM dies and a new deployment 
happens. Hence, both of these should be configuration knobs and have good 
documentation around what the impact of these configurations are.

##########
File path: 
samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -93,6 +103,31 @@ public void testDoesNotCallbackWhenHeartbeatAlive() throws 
InterruptedException
     verify(this.scheduler).shutdown();
   }
 
+  @Test
+  public void testReestablishConnectionWithNewAM() throws InterruptedException 
{
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, 
this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            "0", coordinatorStreamStore, true, 10));
+    CoordinatorStreamValueSerde serde = new 
CoordinatorStreamValueSerde(SetConfig.TYPE);
+    byte[] newCoordinatorUrl = 
serde.toBytes("http://some-host-2.prod.linkedin.com";);
+    ContainerHeartbeatResponse response1 = new 
ContainerHeartbeatResponse(false);
+    ContainerHeartbeatResponse response2 = new 
ContainerHeartbeatResponse(true);
+    
when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(response1).thenReturn(response2);
+    
when(this.containerHeartbeatMonitor.getContainerHeartbeatClient()).thenReturn(this.containerHeartbeatClient);

Review comment:
       This looks a bit strange as we are not recreating the heartbeat client 
for the sake of tests.
   Ideally, you want to verify if a new client is created with a new URL and 
that gets used.
   
   

##########
File path: 
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -84,6 +113,38 @@ public void stop() {
     }
   }
 
+  private boolean checkAndEstablishConnectionWithNewAM() {
+    boolean response = false;
+    int attempt = 1;
+
+    while (attempt <= RETRY_COUNT) {
+      String newCoordinatorUrl = 
SERDE.fromBytes(coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL));
+      try {
+        if (coordinatorUrl.equals(newCoordinatorUrl)) {
+          LOG.info("Attempt {} to discover new AM. Sleep for {}ms before next 
attempt.", attempt, sleepDurationForReconnectWithAM);
+          Thread.sleep(sleepDurationForReconnectWithAM);
+        } else {
+          LOG.info("Found new AM: {}. Establishing heartbeat with the new 
AM.", newCoordinatorUrl);
+          coordinatorUrl = newCoordinatorUrl;
+          containerHeartbeatClient = getContainerHeartbeatClient();
+          response = containerHeartbeatClient.requestHeartbeat().isAlive();
+          LOG.info("Received heartbeat response: {} from new AM: {}", 
response, this.coordinatorUrl);
+          break;
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted during sleep.");
+        Thread.currentThread().interrupt();
+      }
+      attempt++;
+    }
+    return response;
+  }
+
+  @VisibleForTesting
+  ContainerHeartbeatClient getContainerHeartbeatClient() {

Review comment:
       minor: suggest naming this to create instead of get.

##########
File path: 
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -84,6 +113,38 @@ public void stop() {
     }
   }
 
+  private boolean checkAndEstablishConnectionWithNewAM() {

Review comment:
       It might be nice to breakdown the check and the establishing part.
   [1] It makes it easier to write simpler tests but still tests each logic 
separately
   [2] allows handling and evolution of each modular functions. e.g we don't 
have any retry for establishing but only for checking today.
   
   wdyt?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to