This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 6b19cd6 SAMZA-2323: Provide option allow single containers to fail
without failing the job (#1156)
6b19cd6 is described below
commit 6b19cd660c5d7e28a59aea97a934aa58e9eb1a95
Author: Daniel Nishimura <[email protected]>
AuthorDate: Wed Sep 25 17:39:56 2019 -0700
SAMZA-2323: Provide option allow single containers to fail without failing
the job (#1156)
* SAMZA-2323: Provide option to stop/fail single containers
---
.../versioned/jobs/samza-configurations.md | 1 +
.../clustermanager/ContainerProcessManager.java | 32 +++++++++++++++------
.../apache/samza/config/ClusterManagerConfig.java | 16 +++++++++++
.../TestContainerProcessManager.java | 33 ++++++++++++++--------
4 files changed, 61 insertions(+), 21 deletions(-)
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 2309555..a247460 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -296,6 +296,7 @@ Samza supports both standalone and clustered
([YARN](yarn-jobs.html)) [deploymen
|cluster-manager.container.retry.count|8|If a container fails, it is
automatically restarted by Samza. However, if a container keeps failing shortly
after startup, that indicates a deeper problem, so we should kill the job
rather than retrying indefinitely. This property determines the maximum number
of times we are willing to restart a failed container in quick succession (the
time period is configured with `cluster-manager.container.retry.window.ms`).
Each container in the job is count [...]
|cluster-manager.container.retry.window.ms|300000|This property determines how
frequently a container is allowed to fail before we give up and fail the job.
If the same container has failed more than
`cluster-manager.container.retry.count` times, and the time between failures
was less than this property `cluster-manager.container.retry.window.ms` (in
milliseconds), then we fail the job. There is no limit to the number of times
we will restart a container if the time between failures is g [...]
|cluster-manager.container.preferred-host.last.retry.delay.ms|360000|The delay
added to the last retry for a failing container after all but one of
cluster-manager.container.retry.count retries have been exhausted. The delay is
only added when `job.host-affinity.enabled` is true and the retried request is
for a preferred host. This addresses the issue where there may be a delay when
a preferred host is marked invalid and the container continuously attempts to
restart and fail on the inva [...]
+|cluster-manager.container.fail.job.after.retries|true|This configuration sets
the behavior of the job after all `cluster-manager.container.retry.count`s are
exhausted and each retry is within the
`cluster-manager.container.retry.window.ms` period on any single container. If
set to true, the whole job will fail if any container fails after the last
retry. If set to false, the job will continue to run without the failed
container. The typical use cases of setting this to false is to aid i [...]
|cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor
of `job.jmx.enabled`|
|cluster-manager.allocator.sleep.ms|3600|The container allocator thread is
responsible for matching requests to allocated containers. The sleep interval
for this thread is configured using this property.|
|cluster-manager.container.request.timeout.ms|5000|The allocator thread
periodically checks the state of the container requests and allocated
containers to determine the assignment of a container to an allocated resource.
This property determines the number of milliseconds before a container request
is considered to have expired / timed-out. When a request expires, it gets
allocated to any available container that was returned by the cluster manager.|
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 8e12f82..68c16f7 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -513,12 +513,19 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
int retryCount = clusterManagerConfig.getContainerRetryCount();
int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
int currentFailCount;
+ boolean retryContainerRequest = true;
if (retryCount == 0) {
- LOG.error("Processor ID: {} (current Container ID: {}) failed, and retry
count is set to 0, " +
- "so shutting down the application master and marking the job as
failed.", processorId, containerId);
-
- jobFailureCriteriaMet = true;
+ // Failure criteria met only if failed containers can fail the job.
+ jobFailureCriteriaMet =
clusterManagerConfig.shouldFailJobAfterContainerRetries();
+ if (jobFailureCriteriaMet) {
+ LOG.error("Processor ID: {} (current Container ID: {}) failed, and
retry count is set to 0, " +
+ "so shutting down the application master and marking the job as
failed.", processorId, containerId);
+ } else {
+ LOG.error("Processor ID: {} (current Container ID: {}) failed, and
retry count is set to 0, " +
+ "but the job will continue to run with the failed container.",
processorId, containerId);
+ }
+ retryContainerRequest = false;
} else if (retryCount > 0) {
long durationSinceLastRetryMs;
if (processorFailures.containsKey(processorId)) {
@@ -549,14 +556,20 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
// if fail count is (1 initial failure + max retries) then fail job.
if (currentFailCount > retryCount) {
LOG.error("Processor ID: {} (current Container ID: {}) has failed {}
times, with last failure {} ms ago. " +
- "This is greater than retry count of {} and window of {} ms, "
+
- "so shutting down the application master and marking the job
as failed.",
+ "This is greater than retry count of {} and window of {} ms, ",
processorId, containerId, currentFailCount,
durationSinceLastRetryMs, retryCount, retryWindowMs);
// We have too many failures, and we're within the window
// boundary, so reset shut down the app master.
- jobFailureCriteriaMet = true;
- state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+ retryContainerRequest = false;
+ if (clusterManagerConfig.shouldFailJobAfterContainerRetries()) {
+ jobFailureCriteriaMet = true;
+ LOG.error("Shutting down the application master and marking the job
as failed after max retry attempts.");
+ state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+ } else {
+ LOG.warn("Processor ID: {} with Container ID: {} failed after all
retry attempts. Job will continue to run without this container.",
+ processorId, containerId);
+ }
} else {
LOG.info("Current failure count for Processor ID: {} is {}.",
processorId, currentFailCount);
Duration retryDelay = Duration.ZERO;
@@ -565,10 +578,11 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
retryDelay =
Duration.ofMillis(clusterManagerConfig.getContainerPreferredHostLastRetryDelayMs());
}
processorFailures.put(processorId, new
ProcessorFailure(currentFailCount, now, retryDelay));
+ retryContainerRequest = true;
}
}
- if (!jobFailureCriteriaMet) {
+ if (retryContainerRequest) {
Duration retryDelay = getRetryDelay(processorId);
if (!retryDelay.isZero()) {
LOG.info("Adding a delay of: {} seconds on the last container retry
request for preferred host: {}",
diff --git
a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index 18d3954..8e1b759 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -91,6 +91,12 @@ public class ClusterManagerConfig extends MapConfig {
public static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
/**
+ * Determines if a job should fail after any container has exhausted all its
retries.
+ */
+ public static final String CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES
= "cluster-manager.container.fail.job.after.retries";
+ public static final boolean
DEFAULT_CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES = true;
+
+ /**
* Maximum delay in milliseconds for the last container retry
*/
public static final String
CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS =
@@ -192,6 +198,16 @@ public class ClusterManagerConfig extends MapConfig {
}
}
+ /**
+ * The value of {@link
ClusterManagerConfig#CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES} that
determines if the
+ * job will fail if any container has exhausted all its retries and each
retry is within the {@link
ClusterManagerConfig#CLUSTER_MANAGER_RETRY_WINDOW_MS}.
+ * @return true if the job should fail after any container has exhausted all
its retries; otherwise, false.
+ */
+ public boolean shouldFailJobAfterContainerRetries() {
+ return getBoolean(CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES,
+ DEFAULT_CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES);
+ }
+
public long getContainerPreferredHostLastRetryDelayMs() {
if
(containsKey(CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS)) {
return
getLong(CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS) +
CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_RETRY_DELAY_CLOCK_SKEW_DELTA;
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 997adb1..5347d52 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -89,14 +89,15 @@ public class TestContainerProcessManager {
}
private Config getConfigWithHostAffinity() {
- return getConfigWithHostAffinityAndRetries(true, 1);
+ return getConfigWithHostAffinityAndRetries(true, 1, true);
}
- private Config getConfigWithHostAffinityAndRetries(boolean withHostAffinity,
int maxRetries) {
+ private Config getConfigWithHostAffinityAndRetries(boolean withHostAffinity,
int maxRetries, boolean failAfterRetries) {
Map<String, String> map = new HashMap<>();
map.putAll(config);
map.put("job.host-affinity.enabled", String.valueOf(withHostAffinity));
map.put(ClusterManagerConfig.CLUSTER_MANAGER_CONTAINER_RETRY_COUNT,
String.valueOf(maxRetries));
+
map.put(ClusterManagerConfig.CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES,
String.valueOf(failAfterRetries));
return new MapConfig(map);
}
@@ -366,7 +367,8 @@ public class TestContainerProcessManager {
*/
@Test
public void
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCodeWithNoHostAffinity()
throws Exception {
-
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(false);
+
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(false,
true);
+
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(false,
false);
}
/**
@@ -375,13 +377,14 @@ public class TestContainerProcessManager {
*/
@Test
public void
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCodeWithHostAffinity()
throws Exception {
- testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(true);
+ testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(true,
true);
+ testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(true,
false);
}
- private void
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(boolean
withHostAffinity) throws Exception {
+ private void
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(boolean
withHostAffinity, boolean failAfterRetries) throws Exception {
int maxRetries = 3;
String processorId = "0";
- ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity,
maxRetries));
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity,
maxRetries, failAfterRetries));
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
@@ -450,18 +453,20 @@ public class TestContainerProcessManager {
@Test
public void
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCodeWithNoHostAffinity()
throws Exception {
-
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(false);
+
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(false,
true);
+
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(false,
false);
}
@Test
public void
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCodeWithHostAffinity()
throws Exception {
-
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(true);
+
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(true,
true);
+
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(true,
false);
}
- private void
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(boolean
withHostAffinity) throws Exception {
+ private void
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(boolean
withHostAffinity, boolean failAfterRetries) throws Exception {
int maxRetries = 3;
String processorId = "0";
- ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity,
maxRetries));
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity,
maxRetries, failAfterRetries));
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
@@ -550,9 +555,13 @@ public class TestContainerProcessManager {
// Mock 4th failure not exceeding retry window.
cpm.getProcessorFailures().put(processorId, new ProcessorFailure(3,
Instant.now(), Duration.ZERO));
cpm.onResourceCompleted(new
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
- assertEquals(true, cpm.getJobFailureCriteriaMet()); // expecting failed
container
+ assertEquals(failAfterRetries, cpm.getJobFailureCriteriaMet()); //
expecting failed container
assertEquals(3, cpm.getProcessorFailures().get(processorId).getCount());
// count won't update on failure
- assertTrue(cpm.shouldShutdown());
+ if (failAfterRetries) {
+ assertTrue(cpm.shouldShutdown());
+ } else {
+ assertFalse(cpm.shouldShutdown());
+ }
assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
assertEquals(0, allocator.getContainerRequestState().numDelayedRequests());