rmatharu commented on a change in pull request #1104: SAMZA-2266: Introduce a
backoff when there are repeated failures for host-affinity allocations
URL: https://github.com/apache/samza/pull/1104#discussion_r303559464
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -524,10 +468,125 @@ public void
onStreamProcessorLaunchFailure(SamzaResource resource, Throwable t)
*/
@Override
public void onError(Throwable e) {
- log.error("Exception occurred in callbacks in the Cluster Resource
Manager", e);
+ LOG.error("Exception occurred in callbacks in the Cluster Resource
Manager", e);
exceptionOccurred = e;
}
+ /**
+ * Called within {@link #onResourceCompleted(SamzaResourceStatus)} for
unknown exit statuses. Usually these type of
+ * exit statuses are due to application errors causing the container
resource to fail or for other unknown reasons.
+ * @param resourceStatus reported resource status.
+ * @param containerId container ID
+ * @param processorId processor ID (aka. logical container ID)
+ * @param exitStatus exit status from the {@link
#onResourceCompleted(SamzaResourceStatus)} callback.
+ */
+ @VisibleForTesting
+ void onResourceCompletedWithUnknownStatus(SamzaResourceStatus
resourceStatus, String containerId, String processorId,
+ int exitStatus) {
+ LOG.info("Container ID: {} for Processor ID: {} failed with exit code:
{}.", containerId, processorId, exitStatus);
+
+ state.failedContainers.incrementAndGet();
+ state.failedContainersStatus.put(containerId, resourceStatus);
+ state.jobHealthy.set(false);
+
+ state.neededProcessors.incrementAndGet();
+ // Find out previously running container location
+ String lastSeenOn =
state.jobModelManager.jobModel().getContainerToHostValue(processorId,
SetContainerHostMapping.HOST_KEY);
+ if (!hostAffinityEnabled || lastSeenOn == null) {
+ lastSeenOn = ResourceRequestState.ANY_HOST;
+ }
+ LOG.info("Container ID: {} for Processor ID: {} was last seen on host
{}.", containerId, processorId, lastSeenOn);
+ // A container failed for an unknown reason. Let's check to see if
+ // we need to shutdown the whole app master if too many container
+ // failures have happened. The rules for failing are that the
+ // failure count for a task group id must be > the configured retry
+ // count, and the last failure (the one prior to this one) must have
+ // happened less than retry window ms ago. If retry count is set to
+ // 0, the app master will fail on any container failure. If the
+ // retry count is set to a number < 0, a container failure will
+ // never trigger an app master failure.
+ int maxRetryCount = clusterManagerConfig.getContainerRetryCount();
+ int maxRetryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
+ int currentFailCount = 0;
+
+ if (maxRetryCount == 0) {
+ LOG.error("Processor ID: {} (current Container ID: {}) failed, and retry
count is set to 0, " +
+ "so shutting down the application master and marking the job as
failed.", processorId, containerId);
+
+ tooManyFailedContainers = true;
+ } else if (maxRetryCount > 0) {
+ Instant lastFailureTime;
+ if (processorFailures.containsKey(processorId)) {
+ ProcessorFailure failure = processorFailures.get(processorId);
+ currentFailCount = failure.getCount() + 1;
+ lastFailureTime = failure.getLastFailure();
+ } else {
+ currentFailCount = 1;
+ lastFailureTime = Instant.now();
+ }
+ if (currentFailCount > maxRetryCount) {
+ Duration retryDelay = getHostRetryDelay(lastSeenOn, currentFailCount -
1);
+ long currentRetryWindowMs = Instant.now().toEpochMilli() -
lastFailureTime.plus(retryDelay).toEpochMilli();
+
+ if (currentRetryWindowMs < maxRetryWindowMs) {
+ LOG.error("Processor ID: {} (current Container ID: {}) has failed
with {} retries, within a window of {} ms " +
+ "after a retry delay of {} ms. " +
+ "This is greater than max retry count of {} and max retry
window of {} ms, " +
+ "so shutting down the application master and marking the job
as failed.",
+ processorId, containerId, maxRetryCount, currentRetryWindowMs,
retryDelay.toMillis(), maxRetryCount, maxRetryWindowMs);
+
+ // We have too many failures, and we're within the window
+ // boundary, so reset shut down the app master.
+ tooManyFailedContainers = true;
+ state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+ } else {
+ LOG.info("Resetting failure count for Processor ID: {} back to 1,
since last failure " +
+ "(for Container ID: {}) was outside the bounds of the retry
window.", processorId, containerId);
+
+ // Reset counter back to 1, since the last failure for this
+ // container happened outside the window boundary.
+ processorFailures.put(processorId, new ProcessorFailure(1,
Instant.now()));
+ }
+ } else {
+ LOG.info("Current failure count for Processor ID: {} is {}.",
processorId, currentFailCount);
+ processorFailures.put(processorId, new
ProcessorFailure(currentFailCount, Instant.now()));
+ }
+ }
+
+ if (!tooManyFailedContainers) {
+ Duration retryDelay = getHostRetryDelay(lastSeenOn, currentFailCount);
+ LOG.info("Retrying request for preferred host: {} for Processor ID: {}
(current Container ID: {}) with a delay of {} ms.",
+ lastSeenOn, processorId, resourceStatus.getContainerId(),
retryDelay.toMillis());
+ handleContainerStop(processorId, resourceStatus.getContainerId(),
lastSeenOn, exitStatus, retryDelay);
+ }
+ }
+
+ /**
+ * Calculates the container request retry delay based on the host name.
+ * @param host host name
+ * @param failCount current number of times the container on the host failed
+ * @return the duration of the exponential backoff delay calculated from the
failCount. The max delay is obtained from
+ * {@link ClusterManagerConfig#getContainerRetryMaxDelayMs()}. If the host
name is equal to
+ * {@link ResourceRequestState#ANY_HOST}, then always return {@link
Duration#ZERO}.
+ */
+ @VisibleForTesting
+ Duration getHostRetryDelay(String host, int failCount) {
+ // Only add a retry delay when host is a preferred host or if the request
failed more than once.
+ if (failCount < 2 || StringUtils.isBlank(host) ||
host.equals(ResourceRequestState.ANY_HOST)) {
+ return Duration.ZERO;
+ }
+
+ long retryDelayMs = 0;
+ long delayMultiplier = Duration.ofSeconds(5).toMillis();
+
+ // add delay only after the first failure
+ if (failCount > 1) {
+ retryDelayMs = delayMultiplier * Math.round(Math.pow(2, failCount - 2));
+ }
+ retryDelayMs = Math.min(retryDelayMs,
clusterManagerConfig.getContainerRetryMaxDelayMs());
Review comment:
Perhaps consider using org.apache.samza.util.ExponentialSleepStrategy or
org.apache.commons.math3.distribution. ExponentialDistribution
to avoid dealing with backoff-match logic here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services