This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b19cd6  SAMZA-2323: Provide option allow single containers to fail 
without failing the job (#1156)
6b19cd6 is described below

commit 6b19cd660c5d7e28a59aea97a934aa58e9eb1a95
Author: Daniel Nishimura <[email protected]>
AuthorDate: Wed Sep 25 17:39:56 2019 -0700

    SAMZA-2323: Provide option allow single containers to fail without failing 
the job (#1156)
    
    * SAMZA-2323: Provide option to stop/fail single containers
---
 .../versioned/jobs/samza-configurations.md         |  1 +
 .../clustermanager/ContainerProcessManager.java    | 32 +++++++++++++++------
 .../apache/samza/config/ClusterManagerConfig.java  | 16 +++++++++++
 .../TestContainerProcessManager.java               | 33 ++++++++++++++--------
 4 files changed, 61 insertions(+), 21 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 2309555..a247460 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -296,6 +296,7 @@ Samza supports both standalone and clustered 
([YARN](yarn-jobs.html)) [deploymen
 |cluster-manager.container.retry.count|8|If a container fails, it is 
automatically restarted by Samza. However, if a container keeps failing shortly 
after startup, that indicates a deeper problem, so we should kill the job 
rather than retrying indefinitely. This property determines the maximum number 
of times we are willing to restart a failed container in quick succession (the 
time period is configured with `cluster-manager.container.retry.window.ms`). 
Each container in the job is count [...]
 |cluster-manager.container.retry.window.ms|300000|This property determines how 
frequently a container is allowed to fail before we give up and fail the job. 
If the same container has failed more than 
`cluster-manager.container.retry.count` times, and the time between failures 
was less than this property `cluster-manager.container.retry.window.ms` (in 
milliseconds), then we fail the job. There is no limit to the number of times 
we will restart a container if the time between failures is g [...]
 |cluster-manager.container.preferred-host.last.retry.delay.ms|360000|The delay 
added to the last retry for a failing container after all but one of 
cluster-manager.container.retry.count retries have been exhausted. The delay is 
only added when `job.host-affinity.enabled` is true and the retried request is 
for a preferred host. This addresses the issue where there may be a delay when 
a preferred host is marked invalid and the container continuously attempts to 
restart and fail on the inva [...]
+|cluster-manager.container.fail.job.after.retries|true|This configuration sets 
the behavior of the job after all `cluster-manager.container.retry.count`s are 
exhausted and each retry is within the 
`cluster-manager.container.retry.window.ms` period on any single container. If 
set to true, the whole job will fail if any container fails after the last 
retry. If set to false, the job will continue to run without the failed 
container. The typical use cases of setting this to false is to aid i [...]
 |cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor 
of `job.jmx.enabled`|
 |cluster-manager.allocator.sleep.ms|3600|The container allocator thread is 
responsible for matching requests to allocated containers. The sleep interval 
for this thread is configured using this property.|
 |cluster-manager.container.request.timeout.ms|5000|The allocator thread 
periodically checks the state of the container requests and allocated 
containers to determine the assignment of a container to an allocated resource. 
This property determines the number of milliseconds before a container request 
is considered to have expired / timed-out. When a request expires, it gets 
allocated to any available container that was returned by the cluster manager.|
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 8e12f82..68c16f7 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -513,12 +513,19 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     int retryCount = clusterManagerConfig.getContainerRetryCount();
     int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
     int currentFailCount;
+    boolean retryContainerRequest = true;
 
     if (retryCount == 0) {
-      LOG.error("Processor ID: {} (current Container ID: {}) failed, and retry 
count is set to 0, " +
-          "so shutting down the application master and marking the job as 
failed.", processorId, containerId);
-
-      jobFailureCriteriaMet = true;
+      // Failure criteria met only if failed containers can fail the job.
+      jobFailureCriteriaMet = 
clusterManagerConfig.shouldFailJobAfterContainerRetries();
+      if (jobFailureCriteriaMet) {
+        LOG.error("Processor ID: {} (current Container ID: {}) failed, and 
retry count is set to 0, " +
+            "so shutting down the application master and marking the job as 
failed.", processorId, containerId);
+      } else {
+        LOG.error("Processor ID: {} (current Container ID: {}) failed, and 
retry count is set to 0, " +
+            "but the job will continue to run with the failed container.", 
processorId, containerId);
+      }
+      retryContainerRequest = false;
     } else if (retryCount > 0) {
       long durationSinceLastRetryMs;
       if (processorFailures.containsKey(processorId)) {
@@ -549,14 +556,20 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
       // if fail count is (1 initial failure + max retries) then fail job.
       if (currentFailCount > retryCount) {
         LOG.error("Processor ID: {} (current Container ID: {}) has failed {} 
times, with last failure {} ms ago. " +
-                "This is greater than retry count of {} and window of {} ms, " 
+
-                "so shutting down the application master and marking the job 
as failed.",
+                "This is greater than retry count of {} and window of {} ms, ",
             processorId, containerId, currentFailCount, 
durationSinceLastRetryMs, retryCount, retryWindowMs);
 
         // We have too many failures, and we're within the window
         // boundary, so reset shut down the app master.
-        jobFailureCriteriaMet = true;
-        state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+        retryContainerRequest = false;
+        if (clusterManagerConfig.shouldFailJobAfterContainerRetries()) {
+          jobFailureCriteriaMet = true;
+          LOG.error("Shutting down the application master and marking the job 
as failed after max retry attempts.");
+          state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+        } else {
+          LOG.warn("Processor ID: {} with Container ID: {} failed after all 
retry attempts. Job will continue to run without this container.",
+              processorId, containerId);
+        }
       } else {
         LOG.info("Current failure count for Processor ID: {} is {}.", 
processorId, currentFailCount);
         Duration retryDelay = Duration.ZERO;
@@ -565,10 +578,11 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
           retryDelay = 
Duration.ofMillis(clusterManagerConfig.getContainerPreferredHostLastRetryDelayMs());
         }
         processorFailures.put(processorId, new 
ProcessorFailure(currentFailCount, now, retryDelay));
+        retryContainerRequest = true;
       }
     }
 
-    if (!jobFailureCriteriaMet) {
+    if (retryContainerRequest) {
       Duration retryDelay = getRetryDelay(processorId);
       if (!retryDelay.isZero()) {
         LOG.info("Adding a delay of: {} seconds on the last container retry 
request for preferred host: {}",
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index 18d3954..8e1b759 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -91,6 +91,12 @@ public class ClusterManagerConfig extends MapConfig {
   public static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
 
   /**
+   * Determines if a job should fail after any container has exhausted all its 
retries.
+   */
+  public static final String CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES 
= "cluster-manager.container.fail.job.after.retries";
+  public static final boolean 
DEFAULT_CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES = true;
+
+  /**
    * Maximum delay in milliseconds for the last container retry
    */
   public static final String 
CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS =
@@ -192,6 +198,16 @@ public class ClusterManagerConfig extends MapConfig {
     }
   }
 
+  /**
+   * The value of {@link 
ClusterManagerConfig#CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES} that 
determines if the
+   * job will fail if any container has exhausted all its retries and each 
retry is within the {@link 
ClusterManagerConfig#CLUSTER_MANAGER_RETRY_WINDOW_MS}.
+   * @return true if the job should fail after any container has exhausted all 
its retries; otherwise, false.
+   */
+  public boolean shouldFailJobAfterContainerRetries() {
+    return getBoolean(CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES,
+        DEFAULT_CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES);
+  }
+
   public long getContainerPreferredHostLastRetryDelayMs() {
     if 
(containsKey(CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS)) {
       return 
getLong(CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS) + 
CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_RETRY_DELAY_CLOCK_SKEW_DELTA;
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 997adb1..5347d52 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -89,14 +89,15 @@ public class TestContainerProcessManager {
   }
 
   private Config getConfigWithHostAffinity() {
-    return getConfigWithHostAffinityAndRetries(true, 1);
+    return getConfigWithHostAffinityAndRetries(true, 1, true);
   }
 
-  private Config getConfigWithHostAffinityAndRetries(boolean withHostAffinity, 
int maxRetries) {
+  private Config getConfigWithHostAffinityAndRetries(boolean withHostAffinity, 
int maxRetries, boolean failAfterRetries) {
     Map<String, String> map = new HashMap<>();
     map.putAll(config);
     map.put("job.host-affinity.enabled", String.valueOf(withHostAffinity));
     map.put(ClusterManagerConfig.CLUSTER_MANAGER_CONTAINER_RETRY_COUNT, 
String.valueOf(maxRetries));
+    
map.put(ClusterManagerConfig.CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES, 
String.valueOf(failAfterRetries));
     return new MapConfig(map);
   }
 
@@ -366,7 +367,8 @@ public class TestContainerProcessManager {
    */
   @Test
   public void 
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCodeWithNoHostAffinity()
 throws Exception {
-    
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(false);
+    
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(false, 
true);
+    
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(false, 
false);
   }
 
   /**
@@ -375,13 +377,14 @@ public class TestContainerProcessManager {
    */
   @Test
   public void 
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCodeWithHostAffinity()
 throws Exception {
-    testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(true);
+    testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(true, 
true);
+    testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(true, 
false);
   }
 
-  private void 
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(boolean 
withHostAffinity) throws Exception {
+  private void 
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(boolean 
withHostAffinity, boolean failAfterRetries) throws Exception {
     int maxRetries = 3;
     String processorId = "0";
-    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity, 
maxRetries));
+    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity, 
maxRetries, failAfterRetries));
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
@@ -450,18 +453,20 @@ public class TestContainerProcessManager {
 
   @Test
   public void 
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCodeWithNoHostAffinity()
 throws Exception {
-    
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(false);
+    
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(false, 
true);
+    
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(false, 
false);
   }
 
   @Test
   public void 
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCodeWithHostAffinity()
 throws Exception {
-    
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(true);
+    
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(true, 
true);
+    
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(true, 
false);
   }
 
-  private void 
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(boolean 
withHostAffinity) throws Exception {
+  private void 
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(boolean 
withHostAffinity, boolean failAfterRetries) throws Exception {
     int maxRetries = 3;
     String processorId = "0";
-    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity, 
maxRetries));
+    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity, 
maxRetries, failAfterRetries));
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
@@ -550,9 +555,13 @@ public class TestContainerProcessManager {
     // Mock 4th failure not exceeding retry window.
     cpm.getProcessorFailures().put(processorId, new ProcessorFailure(3, 
Instant.now(), Duration.ZERO));
     cpm.onResourceCompleted(new 
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
-    assertEquals(true, cpm.getJobFailureCriteriaMet()); // expecting failed 
container
+    assertEquals(failAfterRetries, cpm.getJobFailureCriteriaMet()); // 
expecting failed container
     assertEquals(3, cpm.getProcessorFailures().get(processorId).getCount()); 
// count won't update on failure
-    assertTrue(cpm.shouldShutdown());
+    if (failAfterRetries) {
+      assertTrue(cpm.shouldShutdown());
+    } else {
+      assertFalse(cpm.shouldShutdown());
+    }
     assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
     assertEquals(0, allocator.getContainerRequestState().numDelayedRequests());
 

Reply via email to