mynameborat commented on a change in pull request #1442:
URL: https://github.com/apache/samza/pull/1442#discussion_r527005009
##########
File path: samza-core/src/main/java/org/apache/samza/config/JobConfig.java
##########
@@ -147,6 +147,11 @@
private static final String JOB_STARTPOINT_ENABLED =
"job.startpoint.enabled";
+ // Enable ClusterBasedJobCoordinator aka ApplicationMaster High Availability.
+ // High availability allows new AM to establish connection with already
running containers
+ public static final String JOB_COORDINATOR_HIGH_AVAILABILITY_ENABLED =
"job.coordinator.high-availability.enabled";
Review comment:
Can we update the configurable table in docs?
##########
File path:
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -34,26 +38,45 @@
public class ContainerHeartbeatMonitor {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
private static final ThreadFactory THREAD_FACTORY = new
HeartbeatThreadFactory();
+ private static final CoordinatorStreamValueSerde SERDE = new
CoordinatorStreamValueSerde(SetConfig.TYPE);
+ private static final int RETRY_COUNT = 5;
Review comment:
Should this be configurable? This is tandem with the
`sleepDurationForReconnectWithAM` determines how long we want to container to
retry establishing connection with AM.
That said, it also means thats how long you can potentially have an
overlapping containers (duplicate container) if AM dies and a new deployment
happens. Hence, both of these should be configuration knobs and have good
documentation around what the impact of these configurations are.
##########
File path:
samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -93,6 +103,31 @@ public void testDoesNotCallbackWhenHeartbeatAlive() throws
InterruptedException
verify(this.scheduler).shutdown();
}
+ @Test
+ public void testReestablishConnectionWithNewAM() throws InterruptedException
{
+ this.containerHeartbeatMonitor =
+ spy(new ContainerHeartbeatMonitor(this.onExpired,
this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+ "0", coordinatorStreamStore, true, 10));
+ CoordinatorStreamValueSerde serde = new
CoordinatorStreamValueSerde(SetConfig.TYPE);
+ byte[] newCoordinatorUrl =
serde.toBytes("http://some-host-2.prod.linkedin.com");
+ ContainerHeartbeatResponse response1 = new
ContainerHeartbeatResponse(false);
+ ContainerHeartbeatResponse response2 = new
ContainerHeartbeatResponse(true);
+
when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(response1).thenReturn(response2);
+
when(this.containerHeartbeatMonitor.getContainerHeartbeatClient()).thenReturn(this.containerHeartbeatClient);
Review comment:
This looks a bit strange as we are not recreating the heartbeat client
for the sake of tests.
Ideally, you want to verify if a new client is created with a new URL and
that gets used.
##########
File path:
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -84,6 +113,38 @@ public void stop() {
}
}
+ private boolean checkAndEstablishConnectionWithNewAM() {
+ boolean response = false;
+ int attempt = 1;
+
+ while (attempt <= RETRY_COUNT) {
+ String newCoordinatorUrl =
SERDE.fromBytes(coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL));
+ try {
+ if (coordinatorUrl.equals(newCoordinatorUrl)) {
+ LOG.info("Attempt {} to discover new AM. Sleep for {}ms before next
attempt.", attempt, sleepDurationForReconnectWithAM);
+ Thread.sleep(sleepDurationForReconnectWithAM);
+ } else {
+ LOG.info("Found new AM: {}. Establishing heartbeat with the new
AM.", newCoordinatorUrl);
+ coordinatorUrl = newCoordinatorUrl;
+ containerHeartbeatClient = getContainerHeartbeatClient();
+ response = containerHeartbeatClient.requestHeartbeat().isAlive();
+ LOG.info("Received heartbeat response: {} from new AM: {}",
response, this.coordinatorUrl);
+ break;
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted during sleep.");
+ Thread.currentThread().interrupt();
+ }
+ attempt++;
+ }
+ return response;
+ }
+
+ @VisibleForTesting
+ ContainerHeartbeatClient getContainerHeartbeatClient() {
Review comment:
minor: suggest naming this to create instead of get.
##########
File path:
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -84,6 +113,38 @@ public void stop() {
}
}
+ private boolean checkAndEstablishConnectionWithNewAM() {
Review comment:
It might be nice to breakdown the check and the establishing part.
[1] It makes it easier to write simpler tests but still tests each logic
separately
[2] allows handling and evolution of each modular functions. e.g we don't
have any retry for establishing but only for checking today.
wdyt?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]