rmatharu commented on a change in pull request #1104: SAMZA-2266: Introduce a 
backoff when there are repeated failures for host-affinity allocations
URL: https://github.com/apache/samza/pull/1104#discussion_r305027652
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 ##########
 @@ -524,10 +468,129 @@ public void 
onStreamProcessorLaunchFailure(SamzaResource resource, Throwable t)
    */
   @Override
   public void onError(Throwable e) {
-    log.error("Exception occurred in callbacks in the Cluster Resource 
Manager", e);
+    LOG.error("Exception occurred in callbacks in the Cluster Resource 
Manager", e);
     exceptionOccurred = e;
   }
 
+  /**
+   * Called within {@link #onResourceCompleted(SamzaResourceStatus)} for 
unknown exit statuses. These exit statuses
+   * correspond to container completion other than container 
run-to-completion, abort or preemption, or disk failure
+   * (e.g., detected by YARN's NM healthchecks).
+   * @param resourceStatus reported resource status.
+   * @param containerId container ID
+   * @param processorId processor ID (aka. logical container ID)
+   * @param exitStatus exit status from the {@link 
#onResourceCompleted(SamzaResourceStatus)} callback.
+   */
+  @VisibleForTesting
+  void onResourceCompletedWithUnknownStatus(SamzaResourceStatus 
resourceStatus, String containerId, String processorId,
+      int exitStatus) {
+    LOG.info("Container ID: {} for Processor ID: {} failed with exit code: 
{}.", containerId, processorId, exitStatus);
+
+    state.failedContainers.incrementAndGet();
+    state.failedContainersStatus.put(containerId, resourceStatus);
+    state.jobHealthy.set(false);
+
+    state.neededProcessors.incrementAndGet();
+    // Find out previously running container location
+    String lastSeenOn = 
state.jobModelManager.jobModel().getContainerToHostValue(processorId, 
SetContainerHostMapping.HOST_KEY);
+    if (!hostAffinityEnabled || lastSeenOn == null) {
+      lastSeenOn = ResourceRequestState.ANY_HOST;
+    }
+    LOG.info("Container ID: {} for Processor ID: {} was last seen on host 
{}.", containerId, processorId, lastSeenOn);
+    // A container failed for an unknown reason. Let's check to see if
+    // we need to shutdown the whole app master if too many container
+    // failures have happened. The rules for failing are that the
+    // failure count for a task group id must be > the configured retry
+    // count, and the last failure (the one prior to this one) must have
+    // happened less than retry window ms ago. If retry count is set to
+    // 0, the app master will fail on any container failure. If the
+    // retry count is set to a number < 0, a container failure will
+    // never trigger an app master failure.
+    int maxRetryCount = clusterManagerConfig.getContainerRetryCount();
+    int maxRetryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
+    int currentFailCount = 0;
+
+    if (maxRetryCount == 0) {
+      LOG.error("Processor ID: {} (current Container ID: {}) failed, and retry 
count is set to 0, " +
+          "so shutting down the application master and marking the job as 
failed.", processorId, containerId);
+
+      tooManyFailedContainers = true;
+    } else if (maxRetryCount > 0) {
+      Instant lastFailureTime;
+      if (processorFailures.containsKey(processorId)) {
+        ProcessorFailure failure = processorFailures.get(processorId);
+        currentFailCount = failure.getCount() + 1;
+        lastFailureTime = failure.getLastFailure();
+      } else {
+        currentFailCount = 1;
+        lastFailureTime = Instant.now();
+      }
+      if (currentFailCount > maxRetryCount) {
+        Duration retryDelay = getHostRetryDelay(lastSeenOn, currentFailCount - 
1);
+        long currentRetryWindowMs = Instant.now().toEpochMilli() - 
lastFailureTime.plus(retryDelay).toEpochMilli();
 
 Review comment:
   As we discussed, the problem here is that, because now the lastFailureTime 
is set to currentTime, as opposed to 0 (in the current code). 
   The currentRetryWindowMs value can now be < 0, as opposed to being the 
currTime value.
   
   This means the check below on line 532 can erroneously pass, and will fail 
the job. 
   In current code, in this case the currentRetryWindowMs (called 
lastFailureMsDiff) 
   gets set = currentTime, which fails the check and falls into the else. 
   
   One way to fix is to set lastFailureTime to 0 (as in current code), or set a 
flag (firstFailure) and use that to make a determination on the if branch 
(since there is no failure-window for the first fail).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to