This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c7fbaa  SAMZA-2713: Add flag to disable container heartbeat monitor 
(#1567)
6c7fbaa is described below

commit 6c7fbaaffa5e403b43683aa9600644b1a596e06c
Author: Cameron Lee <[email protected]>
AuthorDate: Wed Dec 8 10:45:06 2021 -0800

    SAMZA-2713: Add flag to disable container heartbeat monitor (#1567)
    
    API changes (backwards compatible): Specify 
"job.container.heartbeat.monitor.enabled" as true/false to enable/disable the 
ContainerHeartbeatMonitor. This defaults to true for backwards compatibility.
---
 .../java/org/apache/samza/config/JobConfig.java    |  7 +++++
 .../apache/samza/runtime/ContainerLaunchUtil.java  | 34 +++++++++++++---------
 .../org/apache/samza/config/TestJobConfig.java     |  9 ++++++
 3 files changed, 36 insertions(+), 14 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 4822067..0b8f249 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -172,6 +172,9 @@ public class JobConfig extends MapConfig {
   public static final String YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS 
= "yarn.container.heartbeat.retry-sleep-duration.ms";
   public static final long 
YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS_DEFAULT = 10000;
 
+  public static final String CONTAINER_HEARTBEAT_MONITOR_ENABLED = 
"job.container.heartbeat.monitor.enabled";
+  private static final boolean CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT = 
true;
+
   public JobConfig(Config config) {
     super(config);
   }
@@ -459,4 +462,8 @@ public class JobConfig extends MapConfig {
   public boolean getStartpointEnabled() {
     return getBoolean(JOB_STARTPOINT_ENABLED, true);
   }
+
+  public boolean getContainerHeartbeatMonitorEnabled() {
+    return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, 
CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
+  }
 }
\ No newline at end of file
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java 
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index e9f4311..7314a86 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -216,21 +216,27 @@ public class ContainerLaunchUtil {
    */
   private static ContainerHeartbeatMonitor 
createContainerHeartbeatMonitor(SamzaContainer container,
       MetadataStore coordinatorStreamStore, Config config) {
-    String coordinatorUrl = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
-    String executionEnvContainerId = 
System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID);
-    if (executionEnvContainerId != null) {
-      log.info("Got execution environment container id: {}", 
executionEnvContainerId);
-      return new ContainerHeartbeatMonitor(() -> {
-        try {
-          container.shutdown();
-          containerRunnerException = new SamzaException("Container shutdown 
due to expired heartbeat");
-        } catch (Exception e) {
-          log.error("Heartbeat monitor failed to shutdown the container 
gracefully. Exiting process.", e);
-          System.exit(1);
-        }
-      }, coordinatorUrl, executionEnvContainerId, coordinatorStreamStore, 
config);
+    if (new JobConfig(config).getContainerHeartbeatMonitorEnabled()) {
+      String coordinatorUrl = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
+      String executionEnvContainerId = 
System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID);
+      if (executionEnvContainerId != null) {
+        log.info("Got execution environment container id for container 
heartbeat monitor: {}", executionEnvContainerId);
+        return new ContainerHeartbeatMonitor(() -> {
+          try {
+            container.shutdown();
+            containerRunnerException = new SamzaException("Container shutdown 
due to expired heartbeat");
+          } catch (Exception e) {
+            log.error("Heartbeat monitor failed to shutdown the container 
gracefully. Exiting process.", e);
+            System.exit(1);
+          }
+        }, coordinatorUrl, executionEnvContainerId, coordinatorStreamStore, 
config);
+      } else {
+        log.warn("Container heartbeat monitor is enabled, but execution 
environment container id is not set. "
+            + "Container heartbeat monitor will not be created");
+        return null;
+      }
     } else {
-      log.warn("Execution environment container id not set. Container 
heartbeat monitor will not be created");
+      log.info("Container heartbeat monitor is disabled");
       return null;
     }
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index af4bd1d..2b9d630 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -590,4 +590,13 @@ public class TestJobConfig {
     Assert.assertEquals(900, clusterManagerConfig.getContainerMemoryMb());
     Assert.assertEquals(2, clusterManagerConfig.getNumCores());
   }
+
+  @Test
+  public void testGetContainerHeartbeatMonitorEnabled() {
+    assertTrue(new JobConfig(new 
MapConfig()).getContainerHeartbeatMonitorEnabled());
+    assertTrue(new JobConfig(new MapConfig(
+        ImmutableMap.of(JobConfig.CONTAINER_HEARTBEAT_MONITOR_ENABLED, 
"true"))).getContainerHeartbeatMonitorEnabled());
+    assertFalse(new JobConfig(new 
MapConfig(ImmutableMap.of(JobConfig.CONTAINER_HEARTBEAT_MONITOR_ENABLED,
+        "false"))).getContainerHeartbeatMonitorEnabled());
+  }
 }
\ No newline at end of file

Reply via email to