This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 6c7fbaa SAMZA-2713: Add flag to disable container heartbeat monitor
(#1567)
6c7fbaa is described below
commit 6c7fbaaffa5e403b43683aa9600644b1a596e06c
Author: Cameron Lee <[email protected]>
AuthorDate: Wed Dec 8 10:45:06 2021 -0800
SAMZA-2713: Add flag to disable container heartbeat monitor (#1567)
API changes (backwards compatible): Specify
"job.container.heartbeat.monitor.enabled" as true/false to enable/disable the
ContainerHeartbeatMonitor. This defaults to true for backwards compatibility.
---
.../java/org/apache/samza/config/JobConfig.java | 7 +++++
.../apache/samza/runtime/ContainerLaunchUtil.java | 34 +++++++++++++---------
.../org/apache/samza/config/TestJobConfig.java | 9 ++++++
3 files changed, 36 insertions(+), 14 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 4822067..0b8f249 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -172,6 +172,9 @@ public class JobConfig extends MapConfig {
public static final String YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS
= "yarn.container.heartbeat.retry-sleep-duration.ms";
public static final long
YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS_DEFAULT = 10000;
+ public static final String CONTAINER_HEARTBEAT_MONITOR_ENABLED =
"job.container.heartbeat.monitor.enabled";
+ private static final boolean CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT =
true;
+
public JobConfig(Config config) {
super(config);
}
@@ -459,4 +462,8 @@ public class JobConfig extends MapConfig {
public boolean getStartpointEnabled() {
return getBoolean(JOB_STARTPOINT_ENABLED, true);
}
+
+ public boolean getContainerHeartbeatMonitorEnabled() {
+ return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED,
CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
+ }
}
\ No newline at end of file
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index e9f4311..7314a86 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -216,21 +216,27 @@ public class ContainerLaunchUtil {
*/
private static ContainerHeartbeatMonitor
createContainerHeartbeatMonitor(SamzaContainer container,
MetadataStore coordinatorStreamStore, Config config) {
- String coordinatorUrl =
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
- String executionEnvContainerId =
System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID);
- if (executionEnvContainerId != null) {
- log.info("Got execution environment container id: {}",
executionEnvContainerId);
- return new ContainerHeartbeatMonitor(() -> {
- try {
- container.shutdown();
- containerRunnerException = new SamzaException("Container shutdown
due to expired heartbeat");
- } catch (Exception e) {
- log.error("Heartbeat monitor failed to shutdown the container
gracefully. Exiting process.", e);
- System.exit(1);
- }
- }, coordinatorUrl, executionEnvContainerId, coordinatorStreamStore,
config);
+ if (new JobConfig(config).getContainerHeartbeatMonitorEnabled()) {
+ String coordinatorUrl =
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
+ String executionEnvContainerId =
System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID);
+ if (executionEnvContainerId != null) {
+ log.info("Got execution environment container id for container
heartbeat monitor: {}", executionEnvContainerId);
+ return new ContainerHeartbeatMonitor(() -> {
+ try {
+ container.shutdown();
+ containerRunnerException = new SamzaException("Container shutdown
due to expired heartbeat");
+ } catch (Exception e) {
+ log.error("Heartbeat monitor failed to shutdown the container
gracefully. Exiting process.", e);
+ System.exit(1);
+ }
+ }, coordinatorUrl, executionEnvContainerId, coordinatorStreamStore,
config);
+ } else {
+ log.warn("Container heartbeat monitor is enabled, but execution
environment container id is not set. "
+ + "Container heartbeat monitor will not be created");
+ return null;
+ }
} else {
- log.warn("Execution environment container id not set. Container
heartbeat monitor will not be created");
+ log.info("Container heartbeat monitor is disabled");
return null;
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index af4bd1d..2b9d630 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -590,4 +590,13 @@ public class TestJobConfig {
Assert.assertEquals(900, clusterManagerConfig.getContainerMemoryMb());
Assert.assertEquals(2, clusterManagerConfig.getNumCores());
}
+
+ @Test
+ public void testGetContainerHeartbeatMonitorEnabled() {
+ assertTrue(new JobConfig(new
MapConfig()).getContainerHeartbeatMonitorEnabled());
+ assertTrue(new JobConfig(new MapConfig(
+ ImmutableMap.of(JobConfig.CONTAINER_HEARTBEAT_MONITOR_ENABLED,
"true"))).getContainerHeartbeatMonitorEnabled());
+ assertFalse(new JobConfig(new
MapConfig(ImmutableMap.of(JobConfig.CONTAINER_HEARTBEAT_MONITOR_ENABLED,
+ "false"))).getContainerHeartbeatMonitorEnabled());
+ }
}
\ No newline at end of file