This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e17e08  SAMZA-2266: Introduce a backoff when there are repeated 
failures for host-affinity allocations (#1104)
2e17e08 is described below

commit 2e17e08ba2469409243bc1749c608ba31782ff5b
Author: Daniel Nishimura <[email protected]>
AuthorDate: Wed Aug 14 10:44:47 2019 -0700

    SAMZA-2266: Introduce a backoff when there are repeated failures for 
host-affinity allocations (#1104)
    
    * SAMZA-2266: Introduce a backoff when there are repeated failures for 
host-affinity allocations
---
 .../versioned/jobs/samza-configurations.md         |   1 +
 .../clustermanager/AbstractContainerAllocator.java |  60 +++-
 .../samza/clustermanager/ContainerAllocator.java   |   5 +-
 .../clustermanager/ContainerProcessManager.java    | 298 ++++++++++-------
 .../HostAwareContainerAllocator.java               |  18 +-
 .../samza/clustermanager/ProcessorFailure.java     |  22 +-
 .../samza/clustermanager/ResourceRequestState.java | 127 +++++--
 .../samza/clustermanager/SamzaResourceRequest.java |  35 +-
 .../clustermanager/StandbyContainerManager.java    |  14 +-
 .../apache/samza/config/ClusterManagerConfig.java  |  21 +-
 .../clustermanager/TestContainerAllocator.java     |  22 +-
 .../TestContainerProcessManager.java               | 366 ++++++++++++---------
 .../clustermanager/TestContainerRequestState.java  |  94 +++++-
 .../TestHostAwareContainerAllocator.java           |  18 +-
 14 files changed, 733 insertions(+), 368 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index cde7594..2309555 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -295,6 +295,7 @@ Samza supports both standalone and clustered 
([YARN](yarn-jobs.html)) [deploymen
 |--- |--- |--- |
 |cluster-manager.container.retry.count|8|If a container fails, it is 
automatically restarted by Samza. However, if a container keeps failing shortly 
after startup, that indicates a deeper problem, so we should kill the job 
rather than retrying indefinitely. This property determines the maximum number 
of times we are willing to restart a failed container in quick succession (the 
time period is configured with `cluster-manager.container.retry.window.ms`). 
Each container in the job is count [...]
 |cluster-manager.container.retry.window.ms|300000|This property determines how 
frequently a container is allowed to fail before we give up and fail the job. 
If the same container has failed more than 
`cluster-manager.container.retry.count` times, and the time between failures 
was less than this property `cluster-manager.container.retry.window.ms` (in 
milliseconds), then we fail the job. There is no limit to the number of times 
we will restart a container if the time between failures is g [...]
+|cluster-manager.container.preferred-host.last.retry.delay.ms|360000|The delay 
added to the last retry for a failing container after all but one of 
cluster-manager.container.retry.count retries have been exhausted. The delay is 
only added when `job.host-affinity.enabled` is true and the retried request is 
for a preferred host. This addresses the issue where there may be a delay when 
a preferred host is marked invalid and the container continuously attempts to 
restart and fail on the inva [...]
 |cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor 
of `job.jmx.enabled`|
 |cluster-manager.allocator.sleep.ms|3600|The container allocator thread is 
responsible for matching requests to allocated containers. The sleep interval 
for this thread is configured using this property.|
 |cluster-manager.container.request.timeout.ms|5000|The allocator thread 
periodically checks the state of the container requests and allocated 
containers to determine the assignment of a container to an allocated resource. 
This property determines the number of milliseconds before a container request 
is considered to have expired / timed-out. When a request expires, it gets 
allocated to any available container that was returned by the cluster manager.|
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index c5caa21..bba4b52 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -18,6 +18,10 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
@@ -28,8 +32,6 @@ import org.apache.samza.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 
 /**
  * {@link AbstractContainerAllocator} makes requests for physical resources to 
the resource manager and also runs
@@ -110,6 +112,10 @@ public abstract class AbstractContainerAllocator 
implements Runnable {
     while (isRunning) {
       try {
         assignResourceRequests();
+
+        // Move delayed requests that are ready to the active request queue
+        resourceRequestState.sendPendingDelayedResourceRequests();
+
         // Release extra resources and update the entire system's state
         resourceRequestState.releaseExtraResources();
 
@@ -151,7 +157,7 @@ public abstract class AbstractContainerAllocator implements 
Runnable {
 
     // Run processor on resource
     log.info("Found Container ID: {} for Processor ID: {} on host: {} for 
request creation time: {}.",
-        resource.getContainerId(), processorId, preferredHost, 
request.getRequestTimestampMs());
+        resource.getContainerId(), processorId, preferredHost, 
request.getRequestTimestamp());
 
     // Update processor state as "pending" and then issue a request to launch 
it. It's important to perform the state-update
     // prior to issuing the request. Otherwise, there's a race where the 
response callback may arrive sooner and not see
@@ -175,35 +181,65 @@ public abstract class AbstractContainerAllocator 
implements Runnable {
   public abstract void requestResources(Map<String, String> 
processorToHostMapping);
 
   /**
-   * Checks if this allocator has a pending resource request.
+   * Checks if this allocator has a pending resource request with a request 
timestamp equal to or earlier than the current
+   * timestamp.
    * @return {@code true} if there is a pending request, {@code false} 
otherwise.
    */
-  protected final boolean hasPendingRequest() {
-    return peekPendingRequest() != null;
+  protected final boolean hasReadyPendingRequest() {
+    return peekReadyPendingRequest().isPresent();
   }
 
   /**
-   * Retrieves, but does not remove, the next pending request in the queue.
+   * Retrieves, but does not remove, the next pending request in the queue 
with the {@link SamzaResourceRequest#getRequestTimestamp()}
+   * that is greater than the current timestamp.
    *
    * @return  the pending request or {@code null} if there is no pending 
request.
    */
-  protected final SamzaResourceRequest peekPendingRequest() {
-    return resourceRequestState.peekPendingRequest();
+  protected final Optional<SamzaResourceRequest> peekReadyPendingRequest() {
+    SamzaResourceRequest pendingRequest = 
resourceRequestState.peekPendingRequest();
+    return Optional.ofNullable(pendingRequest);
   }
 
   /**
    * Requests a resource from the cluster manager
-   *
    * @param processorId Samza processor ID that will be run when a resource is 
allocated for this request
    * @param preferredHost name of the host that you prefer to run the 
processor on
    */
   public final void requestResource(String processorId, String preferredHost) {
-    SamzaResourceRequest request = getResourceRequest(processorId, 
preferredHost);
+    requestResourceWithDelay(processorId, preferredHost, Duration.ZERO);
+  }
+
+  /**
+   * Requests a resource from the cluster manager with a request timestamp of 
the current time plus the specified delay.
+   * @param processorId Samza processor ID that will be run when a resource is 
allocated for this request
+   * @param preferredHost name of the host that you prefer to run the 
processor on
+   * @param delay the {@link Duration} to add to the request timestamp
+   */
+  public final void requestResourceWithDelay(String processorId, String 
preferredHost, Duration delay) {
+    SamzaResourceRequest request = getResourceRequestWithDelay(processorId, 
preferredHost, delay);
     issueResourceRequest(request);
   }
 
+  /**
+   * Creates a {@link SamzaResourceRequest} to send to the cluster manager
+   * @param processorId Samza processor ID that will be run when a resource is 
allocated for this request
+   * @param preferredHost name of the host that you prefer to run the 
processor on
+   * @return the created request
+   */
   public final SamzaResourceRequest getResourceRequest(String processorId, 
String preferredHost) {
-    return new SamzaResourceRequest(this.containerNumCpuCores, 
this.containerMemoryMb, preferredHost, processorId);
+    return getResourceRequestWithDelay(processorId, preferredHost, 
Duration.ZERO);
+  }
+
+  /**
+   * Creates a {@link SamzaResourceRequest} to send to the cluster manager 
with a request timestamp of the current time
+   * plus the specified delay.
+   * @param processorId Samza processor ID that will be run when a resource is 
allocated for this request
+   * @param preferredHost name of the host that you prefer to run the 
processor on
+   * @param delay the {@link Duration} to add to the request timestamp
+   * @return the created request
+   */
+  public final SamzaResourceRequest getResourceRequestWithDelay(String 
processorId, String preferredHost, Duration delay) {
+    return new SamzaResourceRequest(this.containerNumCpuCores, 
this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay));
   }
 
   public final void issueResourceRequest(SamzaResourceRequest request) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 9a1e31e..9078f25 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -51,9 +51,8 @@ public class ContainerAllocator extends 
AbstractContainerAllocator {
    * */
   @Override
   public void assignResourceRequests() {
-    while (hasPendingRequest() && 
hasAllocatedResource(ResourceRequestState.ANY_HOST)) {
-      SamzaResourceRequest request = peekPendingRequest();
-      runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+    while (hasReadyPendingRequest() && 
hasAllocatedResource(ResourceRequestState.ANY_HOST)) {
+      peekReadyPendingRequest().ifPresent(request -> 
runStreamProcessor(request, ResourceRequestState.ANY_HOST));
     }
   }
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index cba8176..8e12f82 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -19,6 +19,7 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
@@ -64,7 +65,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
  */
 public class ContainerProcessManager implements 
ClusterResourceManager.Callback   {
 
-  private static final Logger log = 
LoggerFactory.getLogger(ContainerProcessManager.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(ContainerProcessManager.class);
 
   /**
    * Metrics for the {@link ContainerProcessManager}
@@ -72,7 +73,6 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
   private final static String METRICS_SOURCE_NAME = "ApplicationMaster";
   private final static String EXEC_ENV_CONTAINER_ID_SYS_PROPERTY = 
"CONTAINER_ID";
 
-
   /**
    * Does this Samza Job need hostAffinity when containers are allocated.
    */
@@ -171,19 +171,19 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
         buildContainerAllocator(this.hostAffinityEnabled, 
this.clusterResourceManager, this.clusterManagerConfig,
             config, this.standbyContainerManager, state, classLoader);
     this.allocatorThread = new Thread(this.containerAllocator, "Container 
Allocator Thread");
-    log.info("Finished container process manager initialization.");
+    LOG.info("Finished container process manager initialization.");
   }
 
   @VisibleForTesting
-  ContainerProcessManager(Config config,
+  ContainerProcessManager(ClusterManagerConfig clusterManagerConfig,
       SamzaApplicationState state,
       MetricsRegistryMap registry,
       ClusterResourceManager resourceManager,
       Optional<AbstractContainerAllocator> allocator,
       ClassLoader classLoader) {
     this.state = state;
-    this.clusterManagerConfig = new ClusterManagerConfig(config);
-    this.jobConfig = new JobConfig(config);
+    this.clusterManagerConfig = clusterManagerConfig;
+    this.jobConfig = new JobConfig(clusterManagerConfig);
 
     this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
 
@@ -192,10 +192,10 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     this.diagnosticsManager = Option.empty();
     this.containerAllocator = allocator.orElseGet(
         () -> buildContainerAllocator(this.hostAffinityEnabled, 
this.clusterResourceManager, this.clusterManagerConfig,
-            config, this.standbyContainerManager, state, classLoader));
+            clusterManagerConfig, this.standbyContainerManager, state, 
classLoader));
 
     this.allocatorThread = new Thread(this.containerAllocator, "Container 
Allocator Thread");
-    log.info("Finished container process manager initialization");
+    LOG.info("Finished container process manager initialization");
   }
 
   private static AbstractContainerAllocator buildContainerAllocator(boolean 
hostAffinityEnabled,
@@ -210,32 +210,39 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
   }
 
   public boolean shouldShutdown() {
-    log.debug("ContainerProcessManager state: Completed containers: {}, 
Configured containers: {}, Are there too many failed containers: {}, Is 
allocator thread alive: {}",
+    LOG.debug("ContainerProcessManager state: Completed containers: {}, 
Configured containers: {}, Are there too many failed containers: {}, Is 
allocator thread alive: {}",
       state.completedProcessors.get(), state.processorCount, 
jobFailureCriteriaMet ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no");
 
     if (exceptionOccurred != null) {
-      log.error("Exception in container process manager", exceptionOccurred);
+      LOG.error("Exception in container process manager", exceptionOccurred);
       throw new SamzaException(exceptionOccurred);
     }
     return jobFailureCriteriaMet || state.completedProcessors.get() == 
state.processorCount.get() || !allocatorThread.isAlive();
   }
 
   public void start() {
-    log.info("Starting the container process manager");
+    LOG.info("Starting the container process manager");
+
+    int containerRetryCount = clusterManagerConfig.getContainerRetryCount();
+    if (containerRetryCount > -1) {
+      LOG.info("Max retries on restarting failed containers: {}", 
containerRetryCount);
+    } else {
+      LOG.info("Infinite retries on restarting failed containers");
+    }
 
     if (jvmMetrics != null) {
       jvmMetrics.start();
     }
 
-    if (this.metricsReporters != null) {
-      this.metricsReporters.values().forEach(reporter -> reporter.start());
+    if (metricsReporters != null) {
+      metricsReporters.values().forEach(reporter -> reporter.start());
     }
 
-    if (this.diagnosticsManager.isDefined()) {
-      this.diagnosticsManager.get().start();
+    if (diagnosticsManager.isDefined()) {
+      diagnosticsManager.get().start();
     }
 
-    log.info("Starting the cluster resource manager");
+    LOG.info("Starting the cluster resource manager");
     clusterResourceManager.start();
 
     
state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
@@ -246,59 +253,59 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     containerAllocator.requestResources(processorToHostMapping);
 
     // Start container allocator thread
-    log.info("Starting the container allocator thread");
+    LOG.info("Starting the container allocator thread");
     allocatorThread.start();
-    log.info("Starting the container process manager");
+    LOG.info("Starting the container process manager");
   }
 
   public void stop() {
-    log.info("Stopping the container process manager");
+    LOG.info("Stopping the container process manager");
 
     // Shutdown allocator thread
     containerAllocator.stop();
     try {
       allocatorThread.join();
-      log.info("Stopped container allocator");
+      LOG.info("Stopped container allocator");
     } catch (InterruptedException ie) {
-      log.error("Allocator thread join threw an interrupted exception", ie);
+      LOG.error("Allocator thread join threw an interrupted exception", ie);
       Thread.currentThread().interrupt();
     }
 
-    if (this.diagnosticsManager.isDefined()) {
+    if (diagnosticsManager.isDefined()) {
       try {
-        this.diagnosticsManager.get().stop();
+        diagnosticsManager.get().stop();
       } catch (InterruptedException e) {
-        log.error("InterruptedException while stopping diagnosticsManager", e);
+        LOG.error("InterruptedException while stopping diagnosticsManager", e);
       }
     }
 
     try {
 
-      if (this.metricsReporters != null) {
-        this.metricsReporters.values().forEach(reporter -> reporter.stop());
+      if (metricsReporters != null) {
+        metricsReporters.values().forEach(reporter -> reporter.stop());
       }
 
-      if (this.jvmMetrics != null) {
+      if (jvmMetrics != null) {
         jvmMetrics.stop();
       }
 
-      log.info("Stopped containerProcessManagerMetrics reporters");
+      LOG.info("Stopped containerProcessManagerMetrics reporters");
     } catch (Throwable e) {
-      log.error("Exception while stopping containerProcessManagerMetrics", e);
+      LOG.error("Exception while stopping containerProcessManagerMetrics", e);
     }
 
     try {
       clusterResourceManager.stop(state.status);
-      log.info("Stopped the cluster resource manager");
+      LOG.info("Stopped the cluster resource manager");
     } catch (Throwable e) {
-      log.error("Exception while stopping cluster resource manager", e);
+      LOG.error("Exception while stopping cluster resource manager", e);
     }
 
-    log.info("Stopped the container process manager");
+    LOG.info("Stopped the container process manager");
   }
 
   public void onResourceAllocated(SamzaResource resource) {
-    log.info("Container ID: {} allocated from RM on host: {}", 
resource.getContainerId(), resource.getHost());
+    LOG.info("Container ID: {} allocated from RM on host: {}", 
resource.getContainerId(), resource.getHost());
     containerAllocator.addResource(resource);
   }
 
@@ -313,7 +320,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     String hostName = null;
     for (Map.Entry<String, SamzaResource> entry: 
state.runningProcessors.entrySet()) {
       if 
(entry.getValue().getContainerId().equals(resourceStatus.getContainerId())) {
-        log.info("Container ID: {} matched running Processor ID: {} on host: 
{}", containerId, entry.getKey(), entry.getValue().getHost());
+        LOG.info("Container ID: {} matched running Processor ID: {} on host: 
{}", containerId, entry.getKey(), entry.getValue().getHost());
 
         processorId = entry.getKey();
         hostName = entry.getValue().getHost();
@@ -321,7 +328,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
       }
     }
     if (processorId == null) {
-      log.info("No running Processor ID found for Container ID: {} with 
Status: {}. Ignoring redundant notification.", containerId, 
resourceStatus.toString());
+      LOG.info("No running Processor ID found for Container ID: {} with 
Status: {}. Ignoring redundant notification.", containerId, 
resourceStatus.toString());
       state.redundantNotifications.incrementAndGet();
 
       if (resourceStatus.getExitCode() != SamzaResourceStatus.SUCCESS) {
@@ -336,7 +343,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     int exitStatus = resourceStatus.getExitCode();
     switch (exitStatus) {
       case SamzaResourceStatus.SUCCESS:
-        log.info("Container ID: {} for Processor ID: {} completed 
successfully.", containerId, processorId);
+        LOG.info("Container ID: {} for Processor ID: {} completed 
successfully.", containerId, processorId);
 
         state.completedProcessors.incrementAndGet();
 
@@ -344,7 +351,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
         processorFailures.remove(processorId);
 
         if (state.completedProcessors.get() == state.processorCount.get()) {
-          log.info("Setting job status to SUCCEEDED since all containers have 
been marked as completed.");
+          LOG.info("Setting job status to SUCCEEDED since all containers have 
been marked as completed.");
           state.status = SamzaApplicationState.SamzaAppStatus.SUCCEEDED;
         }
         break;
@@ -352,7 +359,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
       case SamzaResourceStatus.DISK_FAIL:
       case SamzaResourceStatus.ABORTED:
       case SamzaResourceStatus.PREEMPTED:
-        log.info("Container ID: {} for Processor ID: {} was released with an 
exit code: {}. This means that " +
+        LOG.info("Container ID: {} for Processor ID: {} was released with an 
exit code: {}. This means that " +
                 "the container was killed by YARN, either due to being 
released by the application master " +
                 "or being 'lost' due to node failures etc. or due to 
preemption by the RM." +
                 "Requesting a new container for the processor.",
@@ -368,86 +375,15 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
         state.jobHealthy.set(false);
 
         // handle container stop due to node fail
-        this.handleContainerStop(processorId, resourceStatus.getContainerId(), 
ResourceRequestState.ANY_HOST, exitStatus);
+        handleContainerStop(processorId, resourceStatus.getContainerId(), 
ResourceRequestState.ANY_HOST, exitStatus, Duration.ZERO);
         break;
 
       default:
-        log.info("Container ID: {} for Processor ID: {} failed with exit code: 
{}.", containerId, processorId, exitStatus);
-
-        state.failedContainers.incrementAndGet();
-        state.failedContainersStatus.put(containerId, resourceStatus);
-        state.jobHealthy.set(false);
-
-        state.neededProcessors.incrementAndGet();
-        // Find out previously running container location
-        String lastSeenOn = 
state.jobModelManager.jobModel().getContainerToHostValue(processorId, 
SetContainerHostMapping.HOST_KEY);
-        if (!hostAffinityEnabled || lastSeenOn == null) {
-          lastSeenOn = ResourceRequestState.ANY_HOST;
-        }
-        log.info("Container ID: {} for Processor ID: {} was last seen on host 
{}.", containerId, processorId, lastSeenOn);
-        // A container failed for an unknown reason. Let's check to see if
-        // we need to shutdown the whole app master if too many container
-        // failures have happened. The rules for failing are that the
-        // failure count for a task group id must be > the configured retry
-        // count, and the last failure (the one prior to this one) must have
-        // happened less than retry window ms ago. If retry count is set to
-        // 0, the app master will fail on any container failure. If the
-        // retry count is set to a number < 0, a container failure will
-        // never trigger an app master failure.
-        int retryCount = clusterManagerConfig.getContainerRetryCount();
-        int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
-
-        if (retryCount == 0) {
-          log.error("Processor ID: {} (current Container ID: {}) failed, and 
retry count is set to 0, " +
-              "so shutting down the application master and marking the job as 
failed.", processorId, containerId);
-
-          jobFailureCriteriaMet = true;
-        } else if (retryCount > 0) {
-          int currentFailCount;
-          long lastFailureMsDiff;
-          if (processorFailures.containsKey(processorId)) {
-            ProcessorFailure failure = processorFailures.get(processorId);
-            currentFailCount = failure.getCount() + 1;
-            lastFailureMsDiff = Instant.now().toEpochMilli() - 
failure.getLastFailure();
-          } else {
-            currentFailCount = 1;
-            lastFailureMsDiff = 0;
-          }
-
-          if (lastFailureMsDiff >= retryWindowMs) {
-            log.info("Resetting failure count for Processor ID: {} back to 1, 
since last failure " +
-                "(for Container ID: {}) was outside the bounds of the retry 
window.", processorId, containerId);
-
-            // Reset counter back to 1, since the last failure for this
-            // container happened outside the window boundary.
-            currentFailCount = 1;
-          }
-
-          // if fail count is (1 initial failure + max retries) then fail job.
-          if (currentFailCount > retryCount) {
-            log.error("Processor ID: {} (current Container ID: {}) has failed 
{} times, with last failure {} ms ago. " +
-                    "This is greater than retry count of {} and window of {} 
ms, " +
-                    "so shutting down the application master and marking the 
job as failed.",
-                processorId, containerId, currentFailCount, lastFailureMsDiff, 
retryCount, retryWindowMs);
-
-            // We have too many failures, and we're within the window
-            // boundary, so reset shut down the app master.
-            jobFailureCriteriaMet = true;
-            state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
-          } else {
-            log.info("Current failure count for Processor ID: {} is {}.", 
processorId, currentFailCount);
-            processorFailures.put(processorId, new 
ProcessorFailure(currentFailCount, Instant.now().toEpochMilli()));
-          }
-        }
-
-        if (!jobFailureCriteriaMet) {
-          handleContainerStop(processorId, resourceStatus.getContainerId(), 
lastSeenOn, exitStatus);
-        }
-
+        onResourceCompletedWithUnknownStatus(resourceStatus, containerId, 
processorId, exitStatus);
     }
 
-    if (this.diagnosticsManager.isDefined()) {
-      this.diagnosticsManager.get().addProcessorStopEvent(processorId, 
resourceStatus.getContainerId(), hostName, exitStatus);
+    if (diagnosticsManager.isDefined()) {
+      diagnosticsManager.get().addProcessorStopEvent(processorId, 
resourceStatus.getContainerId(), hostName, exitStatus);
     }
   }
 
@@ -472,13 +408,13 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
 
     // 1. Obtain the processor ID for the pending container on this resource.
     String processorId = getPendingProcessorId(containerId);
-    log.info("Successfully started Processor ID: {} on Container ID: {} on 
host: {}",
+    LOG.info("Successfully started Processor ID: {} on Container ID: {} on 
host: {}",
         processorId, containerId, containerHost);
 
     // 2. Remove the container from the pending buffer and add it to the 
running buffer. Additionally, update the
     // job-health metric.
     if (processorId != null) {
-      log.info("Moving Processor ID: {} on Container ID: {} on host: {} from 
pending to running state.",
+      LOG.info("Moving Processor ID: {} on Container ID: {} on host: {} from 
pending to running state.",
           processorId, containerId, containerHost);
       state.pendingProcessors.remove(processorId);
       state.runningProcessors.put(processorId, resource);
@@ -487,7 +423,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
         state.jobHealthy.set(true);
       }
     } else {
-      log.warn("Did not find a pending Processor ID for Container ID: {} on 
host: {}. " +
+      LOG.warn("Did not find a pending Processor ID for Container ID: {} on 
host: {}. " +
           "Ignoring invalid/redundant notification.", containerId, 
containerHost);
     }
   }
@@ -499,23 +435,23 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
 
     // 1. Obtain the pending Samza processor ID for this container ID.
     String processorId = getPendingProcessorId(containerId);
-    log.error("Launch failed for pending Processor ID: {} on Container ID: {} 
on host: {} with exception: {}",
+    LOG.error("Launch failed for pending Processor ID: {} on Container ID: {} 
on host: {} with exception: {}",
         processorId, containerId, containerHost, t);
 
     // 2. Release resources for containers that failed back to YARN
-    log.info("Releasing un-startable Container ID: {} for pending Processor 
ID: {}", containerId, processorId);
+    LOG.info("Releasing un-startable Container ID: {} for pending Processor 
ID: {}", containerId, processorId);
     clusterResourceManager.releaseResources(resource);
 
     // 3. Re-request resources on ANY_HOST in case of launch failures on the 
preferred host, if standby are not enabled
     // otherwise calling standbyContainerManager
     if (processorId != null && standbyContainerManager.isPresent()) {
-      
this.standbyContainerManager.get().handleContainerLaunchFail(processorId, 
containerId, containerAllocator);
+      standbyContainerManager.get().handleContainerLaunchFail(processorId, 
containerId, containerAllocator);
     } else if (processorId != null) {
-      log.info("Falling back to ANY_HOST for Processor ID: {} since launch 
failed for Container ID: {} on host: {}",
+      LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch 
failed for Container ID: {} on host: {}",
           processorId, containerId, containerHost);
       containerAllocator.requestResource(processorId, 
ResourceRequestState.ANY_HOST);
     } else {
-      log.warn("Did not find a pending Processor ID for Container ID: {} on 
host: {}. " +
+      LOG.warn("Did not find a pending Processor ID for Container ID: {} on 
host: {}. " +
           "Ignoring invalid/redundant notification.", containerId, 
containerHost);
     }
   }
@@ -526,7 +462,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
    */
   @Override
   public void onError(Throwable e) {
-    log.error("Exception occurred in callbacks in the Cluster Resource 
Manager", e);
+    LOG.error("Exception occurred in callbacks in the Cluster Resource 
Manager", e);
     exceptionOccurred = e;
   }
 
@@ -541,6 +477,114 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
   }
 
   /**
+   * Called within {@link #onResourceCompleted(SamzaResourceStatus)} for 
unknown exit statuses. These exit statuses
+   * correspond to container completion other than container 
run-to-completion, abort or preemption, or disk failure
+   * (e.g., detected by YARN's NM healthchecks).
+   * @param resourceStatus reported resource status.
+   * @param containerId container ID
+   * @param processorId processor ID (aka. logical container ID)
+   * @param exitStatus exit status from the {@link 
#onResourceCompleted(SamzaResourceStatus)} callback.
+   */
+  @VisibleForTesting
+  void onResourceCompletedWithUnknownStatus(SamzaResourceStatus 
resourceStatus, String containerId, String processorId,
+      int exitStatus) {
+    LOG.info("Container ID: {} for Processor ID: {} failed with exit code: 
{}.", containerId, processorId, exitStatus);
+    Instant now = Instant.now();
+    state.failedContainers.incrementAndGet();
+    state.failedContainersStatus.put(containerId, resourceStatus);
+    state.jobHealthy.set(false);
+
+    state.neededProcessors.incrementAndGet();
+    // Find out previously running container location
+    String lastSeenOn = 
state.jobModelManager.jobModel().getContainerToHostValue(processorId, 
SetContainerHostMapping.HOST_KEY);
+    if (!hostAffinityEnabled || lastSeenOn == null) {
+      lastSeenOn = ResourceRequestState.ANY_HOST;
+    }
+    LOG.info("Container ID: {} for Processor ID: {} was last seen on host 
{}.", containerId, processorId, lastSeenOn);
+    // A container failed for an unknown reason. Let's check to see if
+    // we need to shutdown the whole app master if too many container
+    // failures have happened. The rules for failing are that the
+    // failure count for a task group id must be > the configured retry
+    // count, and the last failure (the one prior to this one) must have
+    // happened less than retry window ms ago. If retry count is set to
+    // 0, the app master will fail on any container failure. If the
+    // retry count is set to a number < 0, a container failure will
+    // never trigger an app master failure.
+    int retryCount = clusterManagerConfig.getContainerRetryCount();
+    int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
+    int currentFailCount;
+
+    if (retryCount == 0) {
+      LOG.error("Processor ID: {} (current Container ID: {}) failed, and retry 
count is set to 0, " +
+          "so shutting down the application master and marking the job as 
failed.", processorId, containerId);
+
+      jobFailureCriteriaMet = true;
+    } else if (retryCount > 0) {
+      long durationSinceLastRetryMs;
+      if (processorFailures.containsKey(processorId)) {
+        ProcessorFailure failure = processorFailures.get(processorId);
+        currentFailCount = failure.getCount() + 1;
+        Duration lastRetryDelay = getRetryDelay(processorId);
+        Instant retryAttemptedAt = 
failure.getLastFailure().plus(lastRetryDelay);
+        durationSinceLastRetryMs = now.toEpochMilli() - 
retryAttemptedAt.toEpochMilli();
+        if (durationSinceLastRetryMs < 0) {
+          // This should never happen without changes to the system clock or 
time travel. Log a warning just in case.
+          LOG.warn("Last failure at: {} with a retry attempted at: {} which is 
supposed to be before current time of: {}",
+              failure.getLastFailure(), retryAttemptedAt, now);
+        }
+      } else {
+        currentFailCount = 1;
+        durationSinceLastRetryMs = 0;
+      }
+
+      if (durationSinceLastRetryMs >= retryWindowMs) {
+        LOG.info("Resetting failure count for Processor ID: {} back to 1, 
since last failure " +
+            "(for Container ID: {}) was outside the bounds of the retry 
window.", processorId, containerId);
+
+        // Reset counter back to 1, since the last failure for this
+        // container happened outside the window boundary.
+        currentFailCount = 1;
+      }
+
+      // if fail count is (1 initial failure + max retries) then fail job.
+      if (currentFailCount > retryCount) {
+        LOG.error("Processor ID: {} (current Container ID: {}) has failed {} 
times, with last failure {} ms ago. " +
+                "This is greater than retry count of {} and window of {} ms, " 
+
+                "so shutting down the application master and marking the job 
as failed.",
+            processorId, containerId, currentFailCount, 
durationSinceLastRetryMs, retryCount, retryWindowMs);
+
+        // We have too many failures, and we're within the window
+        // boundary, so reset shut down the app master.
+        jobFailureCriteriaMet = true;
+        state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+      } else {
+        LOG.info("Current failure count for Processor ID: {} is {}.", 
processorId, currentFailCount);
+        Duration retryDelay = Duration.ZERO;
+        if (!ResourceRequestState.ANY_HOST.equals(lastSeenOn) && 
currentFailCount == retryCount) {
+          // Add the preferred host last retry delay on the last retry
+          retryDelay = 
Duration.ofMillis(clusterManagerConfig.getContainerPreferredHostLastRetryDelayMs());
+        }
+        processorFailures.put(processorId, new 
ProcessorFailure(currentFailCount, now, retryDelay));
+      }
+    }
+
+    if (!jobFailureCriteriaMet) {
+      Duration retryDelay = getRetryDelay(processorId);
+      if (!retryDelay.isZero()) {
+        LOG.info("Adding a delay of: {} seconds on the last container retry 
request for preferred host: {}",
+            retryDelay.getSeconds(), lastSeenOn);
+      }
+      handleContainerStop(processorId, resourceStatus.getContainerId(), 
lastSeenOn, exitStatus, retryDelay);
+    }
+  }
+
+  private Duration getRetryDelay(String processorId) {
+    return processorFailures.containsKey(processorId)
+        ? processorFailures.get(processorId).getLastRetryDelay()
+        : Duration.ZERO;
+  }
+
+  /**
    * Returns an instantiated {@link ResourceManagerFactory} from a {@link 
ClusterManagerConfig}. The
    * {@link ResourceManagerFactory} is used to return an implementation of a 
{@link ClusterResourceManager}
    *
@@ -555,7 +599,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     try {
       factory = ReflectionUtil.getObj(classLoader, 
containerManagerFactoryClass, ResourceManagerFactory.class);
     } catch (Exception e) {
-      log.error("Error creating the cluster resource manager.", e);
+      LOG.error("Error creating the cluster resource manager.", e);
       throw new SamzaException(e);
     }
     return factory;
@@ -570,19 +614,19 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
   private String getPendingProcessorId(String resourceId) {
     for (Map.Entry<String, SamzaResource> entry: 
state.pendingProcessors.entrySet()) {
       if (entry.getValue().getContainerId().equals(resourceId)) {
-        log.info("Container ID: {} matched pending Processor ID: {} on host: 
{}", resourceId, entry.getKey(), entry.getValue().getHost());
+        LOG.info("Container ID: {} matched pending Processor ID: {} on host: 
{}", resourceId, entry.getKey(), entry.getValue().getHost());
         return entry.getKey();
       }
     }
     return null;
   }
 
-  private void handleContainerStop(String processorId, String resourceID, 
String preferredHost, int exitStatus) {
+  private void handleContainerStop(String processorId, String resourceID, 
String preferredHost, int exitStatus, Duration preferredHostRetryDelay) {
     if (standbyContainerManager.isPresent()) {
-      standbyContainerManager.get().handleContainerStop(processorId, 
resourceID, preferredHost, exitStatus, containerAllocator);
+      standbyContainerManager.get().handleContainerStop(processorId, 
resourceID, preferredHost, exitStatus, containerAllocator, 
preferredHostRetryDelay);
     } else {
       // If StandbyTasks are not enabled, we simply make a request for the 
preferredHost
-      containerAllocator.requestResource(processorId, preferredHost);
+      containerAllocator.requestResourceWithDelay(processorId, preferredHost, 
preferredHostRetryDelay);
     }
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index 044f29c..177c473 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.clustermanager;
 
-
-import java.util.Optional;
+import java.time.Instant;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.samza.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,11 +69,13 @@ public class HostAwareContainerAllocator extends 
AbstractContainerAllocator {
    */
   @Override
   public void assignResourceRequests()  {
-    while (hasPendingRequest()) {
-      SamzaResourceRequest request = peekPendingRequest();
+    for (Optional<SamzaResourceRequest> requestOptional = 
peekReadyPendingRequest();
+        requestOptional.isPresent();
+        requestOptional = peekReadyPendingRequest()) {
+      SamzaResourceRequest request = requestOptional.get();
       String processorId = request.getProcessorId();
       String preferredHost = request.getPreferredHost();
-      long requestCreationTime = request.getRequestTimestampMs();
+      Instant requestCreationTime = request.getRequestTimestamp();
       log.info("Handling assignment request for Processor ID: {} on preferred 
host: {}.", processorId, preferredHost);
 
       if (hasAllocatedResource(preferredHost)) {
@@ -144,11 +146,11 @@ public class HostAwareContainerAllocator extends 
AbstractContainerAllocator {
    * @return true if request has expired
    */
   private boolean isRequestExpired(SamzaResourceRequest request) {
-    long currTime = System.currentTimeMillis();
-    boolean requestExpired =  currTime - request.getRequestTimestampMs() > 
requestTimeout;
+    long currTime = Instant.now().toEpochMilli();
+    boolean requestExpired =  currTime - 
request.getRequestTimestamp().toEpochMilli() > requestTimeout;
     if (requestExpired) {
       log.info("Request for Processor ID: {} on host: {} with creation time: 
{} has expired at current time: {} after timeout: {} ms.",
-          request.getProcessorId(), request.getPreferredHost(), 
request.getRequestTimestampMs(), currTime, requestTimeout);
+          request.getProcessorId(), request.getPreferredHost(), 
request.getRequestTimestamp(), currTime, requestTimeout);
     }
     return requestExpired;
   }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ProcessorFailure.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ProcessorFailure.java
index e1b0c93..8ab00f9 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ProcessorFailure.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ProcessorFailure.java
@@ -19,11 +19,15 @@
 
 package org.apache.samza.clustermanager;
 
+import java.time.Duration;
+import java.time.Instant;
+
+
 /**
  * A ProcessorFailure instance encapsulates information related to a Samza 
processor failure.
  * It keeps track of the time of the last failure, the number of failures.
  */
-public class ProcessorFailure {
+class ProcessorFailure {
   /**
    * Number of times a processor has failed
    */
@@ -31,18 +35,28 @@ public class ProcessorFailure {
   /**
    * Latest failure time of the processor
    */
-  private final Long lastFailure;
+  private final Instant lastFailure;
 
-  public ProcessorFailure(int count, Long lastFailure) {
+  /**
+   * The delay added to the retry attempt of the last failure
+   */
+  private final Duration lastRetryDelay;
+
+  public ProcessorFailure(int count, Instant lastFailure, Duration 
lastRetryDelay) {
     this.count = count;
     this.lastFailure = lastFailure;
+    this.lastRetryDelay = lastRetryDelay;
   }
 
   public int getCount() {
     return count;
   }
 
-  public Long getLastFailure() {
+  public Instant getLastFailure() {
     return lastFailure;
   }
+
+  public Duration getLastRetryDelay() {
+    return lastRetryDelay;
+  }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index 8da3337..2e4bcdb 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -18,17 +18,19 @@
  */
 package org.apache.samza.clustermanager;
 
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@link ResourceRequestState} maintains the state variables for all the 
resource requests and the allocated resources returned
@@ -46,16 +48,27 @@ public class ResourceRequestState {
    * Maintain a map of hostname to a list of resources allocated on this host
    */
   private final Map<String, List<SamzaResource>> allocatedResources = new 
HashMap<>();
+
   /**
    * Represents the queue of resource requests made by the {@link 
ContainerProcessManager}
    */
-  private final PriorityQueue<SamzaResourceRequest> requestsQueue = new 
PriorityQueue<SamzaResourceRequest>();
+  private final PriorityQueue<SamzaResourceRequest> requestsQueue = new 
PriorityQueue<>();
+
+  /**
+   * Represents the queue of delayed resource requests made by the {@link 
ContainerProcessManager}
+   * The difference between requestsQueue and delayedRequestsQueue is that, 
any request present the requestsQueue has
+   * been sent out to the cluster resource manager, while requests in the 
delayedRequestsQueue will be sent to the
+   * cluster resource manager only when their delay reaches 0.
+   */
+  private final DelayedRequestQueue delayedRequestsQueue = new 
DelayedRequestQueue();
+
   /**
    * Maintain a map of hostname to the number of requests made for resources 
on this host
    * This state variable is used to look-up whether an allocated resource on a 
host was ever requested in the past.
    * This map is not updated when host-affinity is not enabled
    */
   private final Map<String, AtomicInteger> hostRequestCounts = new HashMap<>();
+
   /**
    * Indicates whether host-affinity is enabled or not
    */
@@ -71,34 +84,20 @@ public class ResourceRequestState {
   }
 
   /**
-   * Enqueues a {@link SamzaResourceRequest} to be sent to a {@link 
ClusterResourceManager}.
+   * Sends a {@link SamzaResourceRequest} to the {@link 
ClusterResourceManager} and queues the {@link SamzaResourceRequest}
+   * to be matched with the {@link SamzaResource} when it is provisioned.
    *
-   * @param request {@link SamzaResourceRequest} to be queued
+   * @param request {@link SamzaResourceRequest} to be sent and queued.
    */
   public void addResourceRequest(SamzaResourceRequest request) {
     synchronized (lock) {
-      requestsQueue.add(request);
-      String preferredHost = request.getPreferredHost();
-
-      // if host affinity is enabled, update state.
-      if (hostAffinityEnabled) {
-        //increment # of requests on the host.
-        if (hostRequestCounts.containsKey(preferredHost)) {
-          hostRequestCounts.get(preferredHost).incrementAndGet();
-        } else {
-          hostRequestCounts.put(preferredHost, new AtomicInteger(1));
-        }
-        /**
-         * The following is important to correlate allocated resource data 
with the requestsQueue made before. If
-         * the preferredHost is requested for the first time, the state should 
reflect that the allocatedResources
-         * list is empty and NOT null.
-         */
-
-        if (!allocatedResources.containsKey(preferredHost)) {
-          allocatedResources.put(preferredHost, new ArrayList<>());
-        }
+      if (request.getRequestTimestamp().isAfter(Instant.now())) {
+        delayedRequestsQueue.add(request);
+        return;
       }
-      manager.requestResources(request);
+
+      // Send request immediately if the request timestamp is not in the 
future.
+      sendResourceRequest(request);
     }
   }
 
@@ -110,6 +109,7 @@ public class ResourceRequestState {
   public void cancelResourceRequest(SamzaResourceRequest request) {
     log.info("Canceling resource request for Processor ID: {} on host: {}", 
request.getProcessorId(), request.getPreferredHost());
     synchronized (lock) {
+      delayedRequestsQueue.remove(request);
       requestsQueue.remove(request);
       if (hostAffinityEnabled) {
         // assignedHost may not always be the preferred host.
@@ -206,6 +206,22 @@ public class ResourceRequestState {
   }
 
   /**
+   * Sends the {@link SamzaResourceRequest}s in the delayed requests queue 
that have expired.
+   * @return number of delayed requests sent.
+   */
+  public int sendPendingDelayedResourceRequests() {
+    synchronized (lock) {
+      int numMoved = 0;
+      Instant now = Instant.now();
+      while (!delayedRequestsQueue.isEmpty() && 
delayedRequestsQueue.peek().getRequestTimestamp().isBefore(now)) {
+        sendResourceRequest(delayedRequestsQueue.poll());
+        numMoved++;
+      }
+      return numMoved;
+    }
+  }
+
+  /**
    * If requestQueue is empty, all extra resources in the buffer should be 
released and update the entire system's state
    * Needs to be synchronized because it is modifying shared state buffers
    * @return the number of resources released.
@@ -275,6 +291,36 @@ public class ResourceRequestState {
     }
   }
 
+  /**
+   * Sends the request to the {@link ClusterResourceManager} while queuing the 
request to be matched with the returned
+   * {@link SamzaResource}. Caller must call this in a synchronized block.
+   * @param request to be sent.
+   */
+  @VisibleForTesting
+  void sendResourceRequest(SamzaResourceRequest request) {
+    requestsQueue.add(request);
+    String preferredHost = request.getPreferredHost();
+
+    // if host affinity is enabled, update state.
+    if (hostAffinityEnabled) {
+      //increment # of requests on the host.
+      if (hostRequestCounts.containsKey(preferredHost)) {
+        hostRequestCounts.get(preferredHost).incrementAndGet();
+      } else {
+        hostRequestCounts.put(preferredHost, new AtomicInteger(1));
+      }
+      /**
+       * The following is important to correlate allocated resource data with 
the requestsQueue made before. If
+       * the preferredHost is requested for the first time, the state should 
reflect that the allocatedResources
+       * list is empty and NOT null.
+       */
+
+      if (!allocatedResources.containsKey(preferredHost)) {
+        allocatedResources.put(preferredHost, new ArrayList<>());
+      }
+    }
+    manager.requestResources(request);
+  }
 
   /**
    * Releases all allocated resources for the specified host.
@@ -359,7 +405,7 @@ public class ResourceRequestState {
    */
   public SamzaResourceRequest peekPendingRequest() {
     synchronized (lock) {
-      return this.requestsQueue.peek();
+      return requestsQueue.peek();
     }
   }
 
@@ -369,10 +415,19 @@ public class ResourceRequestState {
    */
   public int numPendingRequests() {
     synchronized (lock) {
-      return this.requestsQueue.size();
+      return requestsQueue.size();
     }
   }
 
+  /**
+   * Returns the number of delayed SamzaResource requests in the queue.
+   * @return the number of delayed requests
+   */
+  public int numDelayedRequests() {
+    synchronized (lock) {
+      return delayedRequestsQueue.size();
+    }
+  }
 
   /**
    * Returns the list of resources allocated on a given host. If no resources 
were ever allocated on
@@ -393,7 +448,19 @@ public class ResourceRequestState {
   }
 
   // Package private, used only in tests.
+  @VisibleForTesting
   Map<String, AtomicInteger> getHostRequestCounts() {
     return Collections.unmodifiableMap(hostRequestCounts);
   }
+
+  @VisibleForTesting
+  DelayedRequestQueue getDelayedRequestsQueue() {
+    return delayedRequestsQueue;
+  }
+
+  static class DelayedRequestQueue extends PriorityQueue<SamzaResourceRequest> 
{
+    DelayedRequestQueue() {
+      super(Comparator.comparingLong(request -> 
request.getRequestTimestamp().toEpochMilli()));
+    }
+  }
 }
\ No newline at end of file
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
index 78a9dcc..5ca8b24 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
@@ -19,11 +19,11 @@
 
 package org.apache.samza.clustermanager;
 
+import java.time.Instant;
+import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.UUID;
-
 /**
  * Specification of a Request for resources from a ClusterResourceManager. A
  * resource request currently includes cpu cores and memory in MB. A preferred 
host
@@ -60,24 +60,28 @@ public class SamzaResourceRequest implements 
Comparable<SamzaResourceRequest> {
   /**
    * The timestamp in millis when the request was created.
    */
-  private final long requestTimestampMs;
+  private final Instant requestTimestamp;
 
   public SamzaResourceRequest(int numCores, int memoryMB, String 
preferredHost, String processorId) {
+    this(numCores, memoryMB, preferredHost, processorId, Instant.now());
+  }
+
+  public SamzaResourceRequest(int numCores, int memoryMB, String 
preferredHost, String processorId, Instant requestTimestamp) {
     this.numCores = numCores;
     this.memoryMB = memoryMB;
     this.preferredHost = preferredHost;
     this.requestId = UUID.randomUUID().toString();
     this.processorId = processorId;
-    this.requestTimestampMs = System.currentTimeMillis();
-    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at 
time: {} with Request ID: {}", this.processorId, this.preferredHost, 
this.requestTimestampMs, this.requestId);
+    this.requestTimestamp = requestTimestamp;
+    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at 
time: {} with Request ID: {}", this.processorId, this.preferredHost, 
this.requestTimestamp, this.requestId);
   }
 
   public String getProcessorId() {
     return processorId;
   }
 
-  public long getRequestTimestampMs() {
-    return requestTimestampMs;
+  public Instant getRequestTimestamp() {
+    return requestTimestamp;
   }
 
   public String getRequestId() {
@@ -104,29 +108,34 @@ public class SamzaResourceRequest implements 
Comparable<SamzaResourceRequest> {
             ", preferredHost='" + preferredHost + '\'' +
             ", requestId='" + requestId + '\'' +
             ", processorId=" + processorId +
-            ", requestTimestampMs=" + requestTimestampMs +
+            ", requestTimestampMs=" + requestTimestamp +
             '}';
   }
 
   /**
    * Requests are ordered by the processor type and the time at which they 
were created.
-   * Active processors take precedence over standby processors, regardless of 
timestamp.
+   * Requests with timestamps in the future for retries take less precedence 
than timestamps in the past or current.
+   * Otherwise, active processors take precedence over standby processors, 
regardless of timestamp.
    * @param o the other
    */
   @Override
   public int compareTo(SamzaResourceRequest o) {
-    if (!StandbyTaskUtil.isStandbyContainer(this.processorId) && 
StandbyTaskUtil.isStandbyContainer(o.processorId)) {
+    if (!StandbyTaskUtil.isStandbyContainer(processorId) && 
StandbyTaskUtil.isStandbyContainer(o.processorId)) {
       return -1;
     }
 
-    if (StandbyTaskUtil.isStandbyContainer(this.processorId) && 
!StandbyTaskUtil.isStandbyContainer(o.processorId)) {
+    if (StandbyTaskUtil.isStandbyContainer(processorId) && 
!StandbyTaskUtil.isStandbyContainer(o.processorId)) {
       return 1;
     }
 
-    if (this.requestTimestampMs < o.requestTimestampMs)
+    if (requestTimestamp.isBefore(o.requestTimestamp)) {
       return -1;
-    if (this.requestTimestampMs > o.requestTimestampMs)
+    }
+
+    if (requestTimestamp.isAfter(o.requestTimestamp)) {
       return 1;
+    }
+
     return 0;
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index 8f6d675..0a114ab 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -82,10 +83,10 @@ public class StandbyContainerManager {
    * @param containerAllocator the container allocator
    */
   public void handleContainerStop(String containerID, String resourceID, 
String preferredHost, int exitStatus,
-      AbstractContainerAllocator containerAllocator) {
+      AbstractContainerAllocator containerAllocator, Duration 
preferredHostRetryDelay) {
 
     if (StandbyTaskUtil.isStandbyContainer(containerID)) {
-      handleStandbyContainerStop(containerID, resourceID, preferredHost, 
containerAllocator);
+      handleStandbyContainerStop(containerID, resourceID, preferredHost, 
containerAllocator, preferredHostRetryDelay);
     } else {
       // initiate failover for the active container based on the exitStatus
       switch (exitStatus) {
@@ -98,7 +99,8 @@ public class StandbyContainerManager {
         // if this request expires, we can do a failover -- select a standby 
to stop & start the active on standby's host
         default:
           log.info("Requesting resource for active-container {} on host {}", 
containerID, preferredHost);
-          SamzaResourceRequest resourceRequest = 
containerAllocator.getResourceRequest(containerID, preferredHost);
+          SamzaResourceRequest resourceRequest = 
containerAllocator.getResourceRequestWithDelay(containerID, preferredHost, 
preferredHostRetryDelay);
+
           FailoverMetadata failoverMetadata = 
registerActiveContainerFailure(containerID, resourceID);
           failoverMetadata.recordResourceRequest(resourceRequest);
           containerAllocator.issueResourceRequest(resourceRequest);
@@ -135,7 +137,7 @@ public class StandbyContainerManager {
    * @param preferredHost Preferred host of the standby container
    */
   private void handleStandbyContainerStop(String standbyContainerID, String 
resourceID, String preferredHost,
-      AbstractContainerAllocator containerAllocator) {
+      AbstractContainerAllocator containerAllocator, Duration 
preferredHostRetryDelay) {
 
     // if this standbyContainerResource was stopped for a failover, we will 
find a metadata entry
     Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = 
this.checkIfUsedForFailover(resourceID);
@@ -149,7 +151,7 @@ public class StandbyContainerManager {
 
       // request standbycontainer's host for active-container
       SamzaResourceRequest resourceRequestForActive =
-          containerAllocator.getResourceRequest(activeContainerID, 
standbyContainerHostname);
+          containerAllocator.getResourceRequestWithDelay(activeContainerID, 
standbyContainerHostname, preferredHostRetryDelay);
       // record the resource request, before issuing it to avoid race with 
allocation-thread
       failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
       containerAllocator.issueResourceRequest(resourceRequestForActive);
@@ -159,7 +161,7 @@ public class StandbyContainerManager {
     } else {
       log.info("Issuing request for standby container {} on host {}, since 
this is not for a failover",
           standbyContainerID, preferredHost);
-      containerAllocator.requestResource(standbyContainerID, preferredHost);
+      containerAllocator.requestResourceWithDelay(standbyContainerID, 
preferredHost, preferredHostRetryDelay);
     }
   }
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index eda1be8..18d3954 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -20,6 +20,7 @@
 
 package org.apache.samza.config;
 
+import java.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,7 +88,17 @@ public class ClusterManagerConfig extends MapConfig {
    */
   public static final String CONTAINER_RETRY_COUNT = 
"yarn.container.retry.count";
   public static final String CLUSTER_MANAGER_CONTAINER_RETRY_COUNT = 
"cluster-manager.container.retry.count";
-  private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
+  public static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
+
+  /**
+   * Maximum delay in milliseconds for the last container retry
+   */
+  public static final String 
CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS =
+      "cluster-manager.container.preferred-host.last.retry.delay.ms";
+  private static final long 
CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_RETRY_DELAY_CLOCK_SKEW_DELTA =
+      Duration.ofSeconds(1).toMillis();
+  private static final long 
DEFAULT_CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS =
+      Duration.ofMinutes(6).toMillis() + 
CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_RETRY_DELAY_CLOCK_SKEW_DELTA;
 
   /**
    * The cluster managed job coordinator sleeps for a configurable time before 
checking again for termination.
@@ -181,6 +192,14 @@ public class ClusterManagerConfig extends MapConfig {
     }
   }
 
+  public long getContainerPreferredHostLastRetryDelayMs() {
+    if 
(containsKey(CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS)) {
+      return 
getLong(CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS) + 
CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_RETRY_DELAY_CLOCK_SKEW_DELTA;
+    } else {
+      return 
DEFAULT_CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS;
+    }
+  }
+
   public int getContainerRetryWindowMs() {
     if (containsKey(CLUSTER_MANAGER_RETRY_WINDOW_MS)) {
       return getInt(CLUSTER_MANAGER_RETRY_WINDOW_MS);
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index 39a767d..0f90f92 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.clustermanager;
 
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.JobModelManager;
@@ -30,10 +33,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -157,15 +156,16 @@ public class TestContainerAllocator {
 
     assertNotNull(requestState);
 
-    assertEquals(requestState.numPendingRequests(), 4);
+    assertEquals(4, requestState.numPendingRequests());
+    assertEquals(0, requestState.numDelayedRequests());
 
     // If host-affinty is not enabled, it doesn't update the requestMap
     assertNotNull(requestState.getHostRequestCounts());
-    assertEquals(requestState.getHostRequestCounts().keySet().size(), 0);
+    assertEquals(0, requestState.getHostRequestCounts().keySet().size());
 
     assertNotNull(state);
-    assertEquals(state.anyHostRequests.get(), 4);
-    assertEquals(state.preferredHostRequests.get(), 0);
+    assertEquals(4, state.anyHostRequests.get());
+    assertEquals(0, state.preferredHostRequests.get());
   }
 
   /**
@@ -184,11 +184,12 @@ public class TestContainerAllocator {
 
     assertNotNull(requestState);
 
-    assertTrue(requestState.numPendingRequests() == 4);
+    assertEquals(4, requestState.numPendingRequests());
+    assertEquals(0, requestState.numDelayedRequests());
 
     // If host-affinty is not enabled, it doesn't update the requestMap
     assertNotNull(requestState.getHostRequestCounts());
-    assertTrue(requestState.getHostRequestCounts().keySet().size() == 0);
+    assertEquals(0, requestState.getHostRequestCounts().keySet().size());
   }
 
   /**
@@ -213,6 +214,7 @@ public class TestContainerAllocator {
 
         // Test that state is cleaned up
         assertEquals(0, requestState.numPendingRequests());
+        assertEquals(0, requestState.numDelayedRequests());
         assertEquals(0, requestState.getHostRequestCounts().size());
         assertNull(requestState.getResourcesOnAHost("abc"));
         assertNull(requestState.getResourcesOnAHost("def"));
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 99b6bdd..997adb1 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -21,6 +21,7 @@ package org.apache.samza.clustermanager;
 
 import com.google.common.collect.ImmutableMap;
 import java.lang.reflect.Field;
+import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
@@ -49,7 +50,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestContainerProcessManager {
@@ -80,15 +89,13 @@ public class TestContainerProcessManager {
   }
 
   private Config getConfigWithHostAffinity() {
-    Map<String, String> map = new HashMap<>();
-    map.putAll(config);
-    map.put("job.host-affinity.enabled", "true");
-    return new MapConfig(map);
+    return getConfigWithHostAffinityAndRetries(true, 1);
   }
 
-  private Config getConfigWithRetries(int maxRetries) {
+  private Config getConfigWithHostAffinityAndRetries(boolean withHostAffinity, 
int maxRetries) {
     Map<String, String> map = new HashMap<>();
     map.putAll(config);
+    map.put("job.host-affinity.enabled", String.valueOf(withHostAffinity));
     map.put(ClusterManagerConfig.CLUSTER_MANAGER_CONTAINER_RETRY_COUNT, 
String.valueOf(maxRetries));
     return new MapConfig(map);
   }
@@ -117,7 +124,7 @@ public class TestContainerProcessManager {
     server = new MockHttpServer("/", 7777, null, new 
ServletHolder(DefaultServlet.class));
   }
 
-  private Field getPrivateFieldFromTaskManager(String fieldName, 
ContainerProcessManager object) throws Exception {
+  private Field getPrivateFieldFromCpm(String fieldName, 
ContainerProcessManager object) throws Exception {
     Field field = object.getClass().getDeclaredField(fieldName);
     field.setAccessible(true);
     return field;
@@ -135,11 +142,11 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
 
-    ContainerProcessManager taskManager =
-        buildContainerProcessManager(new MapConfig(conf), state, 
clusterResourceManager, Optional.empty());
+    ContainerProcessManager cpm =
+        buildContainerProcessManager(new ClusterManagerConfig(new 
MapConfig(conf)), state, clusterResourceManager, Optional.empty());
 
     AbstractContainerAllocator allocator =
-        (AbstractContainerAllocator) 
getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).get(taskManager);
+        (AbstractContainerAllocator) 
getPrivateFieldFromCpm("containerAllocator", cpm).get(cpm);
     assertEquals(ContainerAllocator.class, allocator.getClass());
     // Asserts that samza exposed container configs is honored by allocator 
thread
     assertEquals(500, allocator.containerMemoryMb);
@@ -153,8 +160,8 @@ public class TestContainerProcessManager {
     state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host1")));
     callback = new MockClusterResourceManagerCallback();
     clusterResourceManager = new MockClusterResourceManager(callback, state);
-    taskManager = new ContainerProcessManager(
-        new MapConfig(conf),
+    cpm = new ContainerProcessManager(
+        new ClusterManagerConfig(new MapConfig(conf)),
         state,
         new MetricsRegistryMap(),
         clusterResourceManager,
@@ -163,7 +170,7 @@ public class TestContainerProcessManager {
     );
 
     allocator =
-        (AbstractContainerAllocator) 
getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).get(taskManager);
+        (AbstractContainerAllocator) 
getPrivateFieldFromCpm("containerAllocator", cpm).get(cpm);
     assertEquals(HostAwareContainerAllocator.class, allocator.getClass());
     // Asserts that samza exposed container configs is honored by allocator 
thread
     assertEquals(500, allocator.containerMemoryMb);
@@ -176,24 +183,26 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     ClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-    ContainerProcessManager taskManager =
-        buildContainerProcessManager(new MapConfig(conf), state, 
clusterResourceManager, Optional.empty());
+    ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
+
+    ContainerProcessManager cpm =
+        buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.empty());
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
         conf,
         state);
 
-    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
+    getPrivateFieldFromCpm("containerAllocator", cpm).set(cpm, allocator);
     CountDownLatch latch = new CountDownLatch(1);
-    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, new Thread() {
+    getPrivateFieldFromCpm("allocatorThread", cpm).set(cpm, new Thread() {
       public void run() {
         isRunning = true;
         latch.countDown();
       }
     });
 
-    taskManager.start();
+    cpm.start();
 
     if (!latch.await(2, TimeUnit.SECONDS)) {
       Assert.fail("timed out waiting for the latch to expire");
@@ -206,7 +215,7 @@ public class TestContainerProcessManager {
     assertEquals(1, state.neededProcessors.get());
     assertEquals(1, allocator.requestedContainers);
 
-    taskManager.stop();
+    cpm.stop();
   }
 
   @Test
@@ -215,66 +224,65 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
 
-    ContainerProcessManager taskManager =
-        buildContainerProcessManager(new MapConfig(conf), state, 
clusterResourceManager, Optional.empty());
-    taskManager.start();
+    ContainerProcessManager cpm =
+        buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.empty());
+    cpm.start();
 
-    Thread allocatorThread = (Thread) 
getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager);
+    Thread allocatorThread = (Thread) 
getPrivateFieldFromCpm("allocatorThread", cpm).get(cpm);
     assertTrue(allocatorThread.isAlive());
 
-    taskManager.stop();
+    cpm.stop();
 
     assertFalse(allocatorThread.isAlive());
   }
 
   /**
-   * Test Task Manager should stop when all containers finish
+   * Test Container Process Manager should stop when all containers finish
    */
   @Test
-  public void testTaskManagerShouldStopWhenContainersFinish() throws Exception 
{
+  public void testCpmShouldStopWhenContainersFinish() throws Exception {
     Config conf = getConfig();
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-
-    ContainerProcessManager taskManager =
-        buildContainerProcessManager(new MapConfig(conf), state, 
clusterResourceManager, Optional.empty());
+    ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
         conf,
         state);
 
-    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
-
-    Thread thread = new Thread(allocator);
-    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, thread);
+    ContainerProcessManager cpm =
+        spy(buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.of(allocator)));
 
     // start triggers a request
-    taskManager.start();
+    cpm.start();
 
-    assertFalse(taskManager.shouldShutdown());
+    assertFalse(cpm.shouldShutdown());
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+    assertEquals(0, allocator.getContainerRequestState().numDelayedRequests());
 
     SamzaResource container = new SamzaResource(1, 1024, "host1", "id0");
-    taskManager.onResourceAllocated(container);
+    cpm.onResourceAllocated(container);
 
     // Allow container to run and update state
 
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
-    taskManager.onStreamProcessorLaunchSuccess(container);
-    assertFalse(taskManager.shouldShutdown());
+    cpm.onStreamProcessorLaunchSuccess(container);
+    assertFalse(cpm.shouldShutdown());
 
-    taskManager.onResourceCompleted(new SamzaResourceStatus("id0", 
"diagnostics", SamzaResourceStatus.SUCCESS));
-    assertTrue(taskManager.shouldShutdown());
+    cpm.onResourceCompleted(new SamzaResourceStatus("id0", "diagnostics", 
SamzaResourceStatus.SUCCESS));
+    verify(cpm, 
never()).onResourceCompletedWithUnknownStatus(any(SamzaResourceStatus.class), 
anyString(), anyString(), anyInt());
+    assertTrue(cpm.shouldShutdown());
   }
 
 
   /**
-   * Test Task Manager should request a new container when a task fails with 
unknown exit code
+   * Test Container Process Manager should request a new container when a task 
fails with unknown exit code
    * When host-affinity is not enabled, it will always request for ANY_HOST
    */
   @Test
@@ -283,81 +291,97 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-
-    ContainerProcessManager taskManager =
-        buildContainerProcessManager(new MapConfig(conf), state, 
clusterResourceManager, Optional.empty());
+    ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
         conf,
         state);
 
-    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
-
-    Thread thread = new Thread(allocator);
-    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, thread);
+    ContainerProcessManager cpm = spy(
+        buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.of(allocator)));
 
     // start triggers a request
-    taskManager.start();
+    cpm.start();
 
-    assertFalse(taskManager.shouldShutdown());
+    verify(clusterManagerConfig, 
never()).getContainerPreferredHostLastRetryDelayMs();
+    verify(cpm, never()).onResourceCompletedWithUnknownStatus(any(), 
anyString(), anyString(), anyInt());
+    assertFalse(cpm.shouldShutdown());
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
 
 
     SamzaResource container = new SamzaResource(1, 1024, "host1", "id0");
-    taskManager.onResourceAllocated(container);
+    cpm.onResourceAllocated(container);
 
     // Allow container to run and update state
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
-    taskManager.onStreamProcessorLaunchSuccess(container);
+    cpm.onStreamProcessorLaunchSuccess(container);
     // Create first container failure
-    taskManager.onResourceCompleted(new 
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
+    SamzaResourceStatus samzaResourceStatus = new 
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1);
+    cpm.onResourceCompleted(samzaResourceStatus);
+
 
     // The above failure should trigger a container request
+    verify(cpm).onResourceCompletedWithUnknownStatus(eq(samzaResourceStatus), 
eq(container.getContainerId()), eq("0"), eq(1));
+    verify(clusterManagerConfig, 
never()).getContainerPreferredHostLastRetryDelayMs();
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
     assertEquals(ResourceRequestState.ANY_HOST, 
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
 
 
-    assertFalse(taskManager.shouldShutdown());
+    assertFalse(cpm.shouldShutdown());
     assertFalse(state.jobHealthy.get());
     assertEquals(2, clusterResourceManager.resourceRequests.size());
     assertEquals(0, clusterResourceManager.releasedResources.size());
 
-    taskManager.onResourceAllocated(container);
+    cpm.onResourceAllocated(container);
 
     // Allow container to run and update state
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
-    taskManager.onStreamProcessorLaunchSuccess(container);
+    cpm.onStreamProcessorLaunchSuccess(container);
     assertTrue(state.jobHealthy.get());
 
     // Create a second failure
-    taskManager.onResourceCompleted(new 
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
-
+    cpm.onResourceCompleted(samzaResourceStatus);
 
     // The above failure should trigger a job shutdown because our retry count 
is set to 1
+    verify(cpm, 
times(2)).onResourceCompletedWithUnknownStatus(eq(samzaResourceStatus), 
eq(container.getContainerId()), eq("0"), eq(1));
+    verify(clusterManagerConfig, 
never()).getContainerPreferredHostLastRetryDelayMs();
     assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
     assertEquals(2, clusterResourceManager.resourceRequests.size());
     assertEquals(0, clusterResourceManager.releasedResources.size());
     assertFalse(state.jobHealthy.get());
-    assertTrue(taskManager.shouldShutdown());
+    assertTrue(cpm.shouldShutdown());
     assertEquals(SamzaApplicationState.SamzaAppStatus.FAILED, state.status);
 
-    taskManager.stop();
+    cpm.stop();
+  }
+
+  /**
+   * Test scenario where a container fails multiple times but failures are 
more than retryWindow apart without host affinity
+   * @throws Exception
+   */
+  @Test
+  public void 
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCodeWithNoHostAffinity()
 throws Exception {
+    
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(false);
   }
 
   /**
-   * Test scenario where a container fails multiple times but failures are 
more than retryWindow apart.
+   * Test scenario where a container fails multiple times but failures are 
more than retryWindow apart with host affinity
    * @throws Exception
    */
   @Test
-  public void 
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode() throws 
Exception {
+  public void 
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCodeWithHostAffinity()
 throws Exception {
+    testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(true);
+  }
+
+  private void 
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(boolean 
withHostAffinity) throws Exception {
     int maxRetries = 3;
     String processorId = "0";
-    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(getConfigWithRetries(maxRetries));
+    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity, 
maxRetries));
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
@@ -368,7 +392,7 @@ public class TestContainerProcessManager {
         state);
 
     ContainerProcessManager cpm =
-        buildContainerProcessManager(new MapConfig(clusterManagerConfig), 
state, clusterResourceManager, Optional.of(allocator));
+        buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.of(allocator));
 
     // start triggers a request
     cpm.start();
@@ -387,7 +411,8 @@ public class TestContainerProcessManager {
 
     // Mock 2nd failure exceeding retry window.
     int longWindow = clusterManagerConfig.getContainerRetryWindowMs() + 10;
-    cpm.getProcessorFailures().put(processorId, new ProcessorFailure(1, 
Instant.now().minusMillis(longWindow).toEpochMilli()));
+    cpm.getProcessorFailures().put(processorId, new ProcessorFailure(1, 
Instant.now().minusMillis(longWindow), Duration.ZERO));
+    assertEquals(1, cpm.getProcessorFailures().get(processorId).getCount());
     cpm.onResourceCompleted(new 
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
     assertEquals(false, cpm.getJobFailureCriteriaMet());
     assertEquals(1, cpm.getProcessorFailures().get(processorId).getCount());
@@ -401,7 +426,7 @@ public class TestContainerProcessManager {
     cpm.onStreamProcessorLaunchSuccess(container);
 
     // Mock 3rd failure exceeding retry window.
-    cpm.getProcessorFailures().put(processorId, new ProcessorFailure(2, 
Instant.now().minusMillis(longWindow).toEpochMilli()));
+    cpm.getProcessorFailures().put(processorId, new ProcessorFailure(2, 
Instant.now().minusMillis(longWindow), Duration.ZERO));
     cpm.onResourceCompleted(new 
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
     assertEquals(false, cpm.getJobFailureCriteriaMet());
     assertEquals(1, cpm.getProcessorFailures().get(processorId).getCount());
@@ -415,7 +440,7 @@ public class TestContainerProcessManager {
     cpm.onStreamProcessorLaunchSuccess(container);
 
     // Mock 4th failure exceeding retry window.
-    cpm.getProcessorFailures().put(processorId, new ProcessorFailure(3, 
Instant.now().minusMillis(longWindow).toEpochMilli()));
+    cpm.getProcessorFailures().put(processorId, new ProcessorFailure(3, 
Instant.now().minusMillis(longWindow), Duration.ZERO));
     cpm.onResourceCompleted(new 
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
     assertEquals(false, cpm.getJobFailureCriteriaMet());
     assertEquals(1, cpm.getProcessorFailures().get(processorId).getCount());
@@ -424,10 +449,19 @@ public class TestContainerProcessManager {
   }
 
   @Test
-  public void 
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode() 
throws Exception {
+  public void 
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCodeWithNoHostAffinity()
 throws Exception {
+    
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(false);
+  }
+
+  @Test
+  public void 
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCodeWithHostAffinity()
 throws Exception {
+    
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(true);
+  }
+
+  private void 
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(boolean 
withHostAffinity) throws Exception {
     int maxRetries = 3;
     String processorId = "0";
-    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(getConfigWithRetries(maxRetries));
+    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity, 
maxRetries));
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
@@ -438,13 +472,14 @@ public class TestContainerProcessManager {
         state);
 
     ContainerProcessManager cpm =
-        buildContainerProcessManager(new MapConfig(clusterManagerConfig), 
state, clusterResourceManager, Optional.of(allocator));
+        buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.of(allocator));
 
     // start triggers a request
     cpm.start();
 
     assertFalse(cpm.shouldShutdown());
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+    assertEquals(0, allocator.getContainerRequestState().numDelayedRequests());
 
     SamzaResource container = new SamzaResource(1, 1024, "host1", "id0");
     cpm.onResourceAllocated(container);
@@ -456,10 +491,13 @@ public class TestContainerProcessManager {
     cpm.onStreamProcessorLaunchSuccess(container);
 
     // Mock 2nd failure not exceeding retry window.
-    cpm.getProcessorFailures().put(processorId, new ProcessorFailure(1, 
Instant.now().toEpochMilli()));
+    cpm.getProcessorFailures().put(processorId, new ProcessorFailure(1, 
Instant.now(), Duration.ZERO));
     cpm.onResourceCompleted(new 
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
     assertEquals(false, cpm.getJobFailureCriteriaMet());
     assertEquals(2, cpm.getProcessorFailures().get(processorId).getCount());
+    assertFalse(cpm.shouldShutdown());
+    assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+    assertEquals(0, allocator.getContainerRequestState().numDelayedRequests());
 
     cpm.onResourceAllocated(container);
 
@@ -470,24 +508,53 @@ public class TestContainerProcessManager {
     cpm.onStreamProcessorLaunchSuccess(container);
 
     // Mock 3rd failure not exceeding retry window.
-    cpm.getProcessorFailures().put(processorId, new ProcessorFailure(2, 
Instant.now().toEpochMilli()));
+    cpm.getProcessorFailures().put(processorId, new ProcessorFailure(2, 
Instant.now(), Duration.ZERO));
     cpm.onResourceCompleted(new 
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
     assertEquals(false, cpm.getJobFailureCriteriaMet());
     assertEquals(3, cpm.getProcessorFailures().get(processorId).getCount());
+    assertFalse(cpm.shouldShutdown());
+
+    if (withHostAffinity) {
+      assertEquals(0, 
allocator.getContainerRequestState().numPendingRequests());
+      assertEquals(1, 
allocator.getContainerRequestState().numDelayedRequests());
+    } else {
+      assertEquals(1, 
allocator.getContainerRequestState().numPendingRequests());
+      assertEquals(0, 
allocator.getContainerRequestState().numDelayedRequests());
+    }
 
     cpm.onResourceAllocated(container);
 
-    // Allow container to run and update state
+    if (withHostAffinity) {
+      if (allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+        // No delayed retry requests for there host affinity is disabled. Call 
back should return immediately.
+        fail("Expecting a delayed request so allocator callback should have 
timed out waiting for a response.");
+      }
+
+      // For the sake of testing the mocked 4th failure below, send delayed 
requests now.
+      SamzaResourceRequest request = 
allocator.getContainerRequestState().getDelayedRequestsQueue().poll();
+      SamzaResourceRequest fastForwardRequest =
+          new SamzaResourceRequest(request.getNumCores(), 
request.getMemoryMB(), request.getPreferredHost(), request.getProcessorId(), 
Instant.now().minusSeconds(1));
+      
allocator.getContainerRequestState().getDelayedRequestsQueue().add(fastForwardRequest);
+      int numSent = 
allocator.getContainerRequestState().sendPendingDelayedResourceRequests();
+      assertEquals(1, numSent);
+      cpm.onResourceAllocated(container);
+    }
+
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
-      fail("timed out waiting for the containers to start");
+      // No delayed retry requests for there host affinity is disabled. Call 
back should return immediately.
+      fail("Timed out waiting for the containers to start");
     }
+
     cpm.onStreamProcessorLaunchSuccess(container);
 
     // Mock 4th failure not exceeding retry window.
-    cpm.getProcessorFailures().put(processorId, new ProcessorFailure(3, 
Instant.now().toEpochMilli()));
+    cpm.getProcessorFailures().put(processorId, new ProcessorFailure(3, 
Instant.now(), Duration.ZERO));
     cpm.onResourceCompleted(new 
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
     assertEquals(true, cpm.getJobFailureCriteriaMet()); // expecting failed 
container
     assertEquals(3, cpm.getProcessorFailures().get(processorId).getCount()); 
// count won't update on failure
+    assertTrue(cpm.shouldShutdown());
+    assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
+    assertEquals(0, allocator.getContainerRequestState().numDelayedRequests());
 
     cpm.stop();
   }
@@ -496,29 +563,24 @@ public class TestContainerProcessManager {
   public void testInvalidNotificationsAreIgnored() throws Exception {
     Config conf = getConfig();
 
-    Map<String, String> config = new HashMap<>();
-    config.putAll(getConfig());
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-
-    ContainerProcessManager taskManager =
-        buildContainerProcessManager(new MapConfig(conf), state, 
clusterResourceManager, Optional.empty());
+    ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
         conf,
         state);
-    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
 
-    Thread thread = new Thread(allocator);
-    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, thread);
+    ContainerProcessManager cpm =
+        spy(buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.of(allocator)));
 
     // Start the task clusterResourceManager
-    taskManager.start();
+    cpm.start();
 
     SamzaResource container = new SamzaResource(1, 1000, "host1", "id1");
-    taskManager.onResourceAllocated(container);
+    cpm.onResourceAllocated(container);
 
     // Allow container to run and update state
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
@@ -526,11 +588,12 @@ public class TestContainerProcessManager {
     }
 
     // Create container failure - with ContainerExitStatus.DISKS_FAILED
-    taskManager.onResourceCompleted(new 
SamzaResourceStatus("invalidContainerID", "Disk failure", 
SamzaResourceStatus.DISK_FAIL));
+    cpm.onResourceCompleted(new SamzaResourceStatus("invalidContainerID", 
"Disk failure", SamzaResourceStatus.DISK_FAIL));
+    verify(cpm, 
never()).onResourceCompletedWithUnknownStatus(any(SamzaResourceStatus.class), 
anyString(), anyString(), anyInt());
 
     // The above failure should not trigger any container requests, since it 
is for an invalid container ID
     assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
-    assertFalse(taskManager.shouldShutdown());
+    assertFalse(cpm.shouldShutdown());
     assertTrue(state.jobHealthy.get());
     assertEquals(state.redundantNotifications.get(), 1);
   }
@@ -549,7 +612,7 @@ public class TestContainerProcessManager {
         state);
 
     ContainerProcessManager manager =
-        new ContainerProcessManager(config, state, new MetricsRegistryMap(), 
clusterResourceManager,
+        new ContainerProcessManager(new ClusterManagerConfig(config), state, 
new MetricsRegistryMap(), clusterResourceManager,
             Optional.of(allocator), getClass().getClassLoader());
 
     manager.start();
@@ -566,6 +629,7 @@ public class TestContainerProcessManager {
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfigWithHostAffinity());
     config.put("job.container.count", "2");
+    config.put("cluster-manager.container.retry.count", "2");
     Config cfg = new MapConfig(config);
     // 1. Request two containers on hosts - host1 and host2
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host1",
@@ -573,41 +637,38 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
 
-    ContainerProcessManager taskManager =
-        buildContainerProcessManager(cfg, state, clusterResourceManager, 
Optional.empty());
-
     MockHostAwareContainerAllocator allocator = new 
MockHostAwareContainerAllocator(
         clusterResourceManager,
         cfg,
         state);
-    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
 
-    Thread thread = new Thread(allocator);
-    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, thread);
+    ContainerProcessManager cpm =
+        spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, 
clusterResourceManager, Optional.of(allocator)));
 
-    taskManager.start();
-    assertFalse(taskManager.shouldShutdown());
+    cpm.start();
+    assertFalse(cpm.shouldShutdown());
     // 2. When the task manager starts, there should have been a pending 
request on host1 and host2
     assertEquals(2, allocator.getContainerRequestState().numPendingRequests());
 
     // 3. Allocate an extra resource on host1 and no resource on host2 yet.
     SamzaResource resource1 = new SamzaResource(1, 1000, "host1", "id1");
     SamzaResource resource2 = new SamzaResource(1, 1000, "host1", "id2");
-    taskManager.onResourceAllocated(resource1);
-    taskManager.onResourceAllocated(resource2);
+    cpm.onResourceAllocated(resource1);
+    cpm.onResourceAllocated(resource2);
 
     // 4. Wait for the container to start on host1 and immediately fail
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
-    taskManager.onStreamProcessorLaunchSuccess(resource1);
+    cpm.onStreamProcessorLaunchSuccess(resource1);
     assertEquals("host2", 
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
 
-    taskManager.onResourceCompleted(new 
SamzaResourceStatus(resource1.getContainerId(), "App Error", 1));
+    cpm.onResourceCompleted(new 
SamzaResourceStatus(resource1.getContainerId(), "App Error", 1));
+    
verify(cpm).onResourceCompletedWithUnknownStatus(any(SamzaResourceStatus.class),
 anyString(), anyString(), anyInt());
     assertEquals(2, allocator.getContainerRequestState().numPendingRequests());
 
-    assertFalse(taskManager.shouldShutdown());
+    assertFalse(cpm.shouldShutdown());
     assertFalse(state.jobHealthy.get());
     assertEquals(3, clusterResourceManager.resourceRequests.size());
     assertEquals(0, clusterResourceManager.releasedResources.size());
@@ -615,13 +676,13 @@ public class TestContainerProcessManager {
     // 5. Do not allocate any further resource on host1, and verify that the 
re-run of the container on host1 uses the
     // previously allocated extra resource
     SamzaResource resource3 = new SamzaResource(1, 1000, "host2", "id3");
-    taskManager.onResourceAllocated(resource3);
+    cpm.onResourceAllocated(resource3);
 
     if (!allocator.awaitContainersStart(2, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
-    taskManager.onStreamProcessorLaunchSuccess(resource2);
-    taskManager.onStreamProcessorLaunchSuccess(resource3);
+    cpm.onStreamProcessorLaunchSuccess(resource2);
+    cpm.onStreamProcessorLaunchSuccess(resource3);
 
     assertTrue(state.jobHealthy.get());
   }
@@ -635,58 +696,57 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-
-    ContainerProcessManager taskManager =
-        buildContainerProcessManager(new MapConfig(conf), state, 
clusterResourceManager, Optional.empty());
+    ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(new MapConfig(conf)));
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
         conf,
         state);
-    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
 
-    Thread thread = new Thread(allocator);
-    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, thread);
+    ContainerProcessManager cpm =
+        spy(buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.of(allocator)));
 
     // Start the task manager
-    taskManager.start();
-    assertFalse(taskManager.shouldShutdown());
+    cpm.start();
+    assertFalse(cpm.shouldShutdown());
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
 
     SamzaResource container1 = new SamzaResource(1, 1000, "host1", "id1");
-    taskManager.onResourceAllocated(container1);
+    cpm.onResourceAllocated(container1);
 
     // Allow container to run and update state
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
-    taskManager.onStreamProcessorLaunchSuccess(container1);
+    cpm.onStreamProcessorLaunchSuccess(container1);
     assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
 
     // Create container failure - with ContainerExitStatus.DISKS_FAILED
-    taskManager.onResourceCompleted(new 
SamzaResourceStatus(container1.getContainerId(), "Disk failure", 
SamzaResourceStatus.DISK_FAIL));
+    cpm.onResourceCompleted(new 
SamzaResourceStatus(container1.getContainerId(), "Disk failure", 
SamzaResourceStatus.DISK_FAIL));
+    verify(cpm, 
never()).onResourceCompletedWithUnknownStatus(any(SamzaResourceStatus.class), 
anyString(), anyString(), anyInt());
 
     // The above failure should trigger a container request
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
-    assertFalse(taskManager.shouldShutdown());
+    assertFalse(cpm.shouldShutdown());
     assertFalse(state.jobHealthy.get());
     assertEquals(2, clusterResourceManager.resourceRequests.size());
     assertEquals(0, clusterResourceManager.releasedResources.size());
     assertEquals(ResourceRequestState.ANY_HOST, 
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
 
     SamzaResource container2 = new SamzaResource(1, 1000, "host1", "id2");
-    taskManager.onResourceAllocated(container2);
+    cpm.onResourceAllocated(container2);
 
     // Allow container to run and update state
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
-    taskManager.onStreamProcessorLaunchSuccess(container2);
+    cpm.onStreamProcessorLaunchSuccess(container2);
 
     assertTrue(state.jobHealthy.get());
 
     // Simulate a duplicate notification for container 1 with a different exit 
code
-    taskManager.onResourceCompleted(new 
SamzaResourceStatus(container1.getContainerId(), "Disk failure", 
SamzaResourceStatus.PREEMPTED));
+    cpm.onResourceCompleted(new 
SamzaResourceStatus(container1.getContainerId(), "Disk failure", 
SamzaResourceStatus.PREEMPTED));
+    verify(cpm, 
never()).onResourceCompletedWithUnknownStatus(any(SamzaResourceStatus.class), 
anyString(), anyString(), anyInt());
     // assert that a duplicate notification does not change metrics (including 
job health)
     assertEquals(state.redundantNotifications.get(), 1);
     assertEquals(2, clusterResourceManager.resourceRequests.size());
@@ -707,83 +767,88 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-
-    ContainerProcessManager taskManager =
-        buildContainerProcessManager(new MapConfig(conf), state, 
clusterResourceManager, Optional.empty());
+    ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(new MapConfig(config)));
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
         conf,
         state);
-    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
 
-    Thread thread = new Thread(allocator);
-    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, thread);
+    ContainerProcessManager cpm =
+        spy(buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.of(allocator)));
 
     // Start the task clusterResourceManager
-    taskManager.start();
-    assertFalse(taskManager.shouldShutdown());
+    cpm.start();
+    assertFalse(cpm.shouldShutdown());
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
 
     SamzaResource container1 = new SamzaResource(1, 1000, "host1", "id1");
-    taskManager.onResourceAllocated(container1);
+    cpm.onResourceAllocated(container1);
 
     // Allow container to run and update state
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
     assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
-    taskManager.onStreamProcessorLaunchSuccess(container1);
+    cpm.onStreamProcessorLaunchSuccess(container1);
     // Create container failure - with ContainerExitStatus.DISKS_FAILED
-    taskManager.onResourceCompleted(new 
SamzaResourceStatus(container1.getContainerId(), "App error", 1));
+    SamzaResourceStatus resourceStatusOnAppError = new 
SamzaResourceStatus(container1.getContainerId(), "App error", 1);
+    cpm.onResourceCompleted(resourceStatusOnAppError);
+    
verify(cpm).onResourceCompletedWithUnknownStatus(eq(resourceStatusOnAppError), 
anyString(), anyString(), anyInt());
 
     // The above failure should trigger a container request
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
-    assertFalse(taskManager.shouldShutdown());
+    assertFalse(cpm.shouldShutdown());
     assertFalse(state.jobHealthy.get());
     assertEquals(2, clusterResourceManager.resourceRequests.size());
     assertEquals(0, clusterResourceManager.releasedResources.size());
     assertEquals(ResourceRequestState.ANY_HOST, 
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
 
     SamzaResource container2 = new SamzaResource(1, 1000, "host1", "id2");
-    taskManager.onResourceAllocated(container2);
+    cpm.onResourceAllocated(container2);
 
     // Allow container to run and update state
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
-    taskManager.onStreamProcessorLaunchSuccess(container2);
+    cpm.onStreamProcessorLaunchSuccess(container2);
 
     // Create container failure - with ContainerExitStatus.PREEMPTED
-    taskManager.onResourceCompleted(new 
SamzaResourceStatus(container2.getContainerId(), "Preemption", 
SamzaResourceStatus.PREEMPTED));
+    SamzaResourceStatus resourceStatusOnPreemption =
+        new SamzaResourceStatus(container2.getContainerId(), "Preemption", 
SamzaResourceStatus.PREEMPTED);
+    cpm.onResourceCompleted(resourceStatusOnPreemption);
+    verify(cpm, 
never()).onResourceCompletedWithUnknownStatus(eq(resourceStatusOnPreemption), 
anyString(), anyString(), anyInt());
     assertEquals(3, clusterResourceManager.resourceRequests.size());
 
     // The above failure should trigger a container request
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
-    assertFalse(taskManager.shouldShutdown());
+    assertFalse(cpm.shouldShutdown());
     assertFalse(state.jobHealthy.get());
     assertEquals(ResourceRequestState.ANY_HOST, 
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
     SamzaResource container3 = new SamzaResource(1, 1000, "host1", "id3");
-    taskManager.onResourceAllocated(container3);
+    cpm.onResourceAllocated(container3);
 
     // Allow container to run and update state
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
-    taskManager.onStreamProcessorLaunchSuccess(container3);
+    cpm.onStreamProcessorLaunchSuccess(container3);
 
     // Create container failure - with ContainerExitStatus.ABORTED
-    taskManager.onResourceCompleted(new 
SamzaResourceStatus(container3.getContainerId(), "Aborted", 
SamzaResourceStatus.ABORTED));
+    SamzaResourceStatus resourceStatusOnAborted =
+        new SamzaResourceStatus(container3.getContainerId(), "Aborted", 
SamzaResourceStatus.ABORTED);
+    cpm.onResourceCompleted(resourceStatusOnAborted);
+    verify(cpm, 
never()).onResourceCompletedWithUnknownStatus(eq(resourceStatusOnAborted), 
anyString(), anyString(), anyInt());
 
     // The above failure should trigger a container request
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
     assertEquals(4, clusterResourceManager.resourceRequests.size());
     assertEquals(0, clusterResourceManager.releasedResources.size());
-    assertFalse(taskManager.shouldShutdown());
+    assertFalse(cpm.shouldShutdown());
     assertFalse(state.jobHealthy.get());
     assertEquals(ResourceRequestState.ANY_HOST, 
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
 
-    taskManager.stop();
+    cpm.stop();
   }
 
   @Test
@@ -793,20 +858,21 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
 
-    ContainerProcessManager taskManager =
-        buildContainerProcessManager(new MapConfig(conf), state, 
clusterResourceManager, Optional.empty());
-    taskManager.start();
+
+    ContainerProcessManager cpm =
+        buildContainerProcessManager(new ClusterManagerConfig(conf), state, 
clusterResourceManager, Optional.empty());
+    cpm.start();
     SamzaResource container2 = new SamzaResource(1, 1024, "", "id0");
-    assertFalse(taskManager.shouldShutdown());
-    taskManager.onResourceAllocated(container2);
+    assertFalse(cpm.shouldShutdown());
+    cpm.onResourceAllocated(container2);
 
     configVals.put(JobConfig.SAMZA_FWK_PATH, "/export/content/whatever");
     Config config1 = new MapConfig(configVals);
 
-    ContainerProcessManager taskManager1 =
-        buildContainerProcessManager(new MapConfig(config), state, 
clusterResourceManager, Optional.empty());
-    taskManager1.start();
-    taskManager1.onResourceAllocated(container2);
+    ContainerProcessManager cpm1 =
+        buildContainerProcessManager(new ClusterManagerConfig(config), state, 
clusterResourceManager, Optional.empty());
+    cpm1.start();
+    cpm1.onResourceAllocated(container2);
   }
 
   @After
@@ -814,9 +880,9 @@ public class TestContainerProcessManager {
     server.stop();
   }
 
-  private ContainerProcessManager buildContainerProcessManager(Config config, 
SamzaApplicationState state,
+  private ContainerProcessManager 
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, 
SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, 
Optional<AbstractContainerAllocator> allocator) {
-    return new ContainerProcessManager(config, state, new 
MetricsRegistryMap(), clusterResourceManager, allocator,
+    return new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(), clusterResourceManager, allocator,
         getClass().getClassLoader());
   }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
index caae4a7..1877547 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
@@ -19,12 +19,26 @@
 
 package org.apache.samza.clustermanager;
 
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Random;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
 
 public class TestContainerRequestState {
 
@@ -89,7 +103,7 @@ public class TestContainerRequestState {
     assertEquals(resource, state.getResourcesOnAHost(ANY_HOST).get(0));
 
     // Container Allocated when there is no request in queue
-    ResourceRequestState state1 = new ResourceRequestState(true, manager);
+    ResourceRequestState state1 = spy(new ResourceRequestState(true, manager));
     SamzaResource container1 = new SamzaResource(1, 1024, "zzz", "id2");
     state1.addResource(container1);
 
@@ -103,7 +117,17 @@ public class TestContainerRequestState {
     // Container Allocated on a Requested Host
     state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "abc", "0"));
 
+    // Delayed request
+    state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "def", "1",
+        Instant.now().plus(Duration.ofHours(1))));
+    state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "ghi", "2",
+        Instant.now().plus(Duration.ofHours(2))));
+
     assertEquals(1, state1.numPendingRequests());
+    assertEquals(2, state1.numDelayedRequests());
+
+    // Verify request sent only once for the non-delayed request
+    verify(state1).sendResourceRequest(any(SamzaResourceRequest.class));
 
     assertNotNull(state1.getHostRequestCounts());
     assertNotNull(state1.getHostRequestCounts().get("abc"));
@@ -215,6 +239,74 @@ public class TestContainerRequestState {
     state.releaseResource("id1");
     assertEquals(0, state.getResourcesOnAHost("abc").size());
     assertEquals(0, state.getResourcesOnAHost(ANY_HOST).size());
+  }
+
+  @Test
+  public void testPriorityQueueOrdering() {
+    PriorityQueue<SamzaResourceRequest> pq = new PriorityQueue<>();
+    Instant now = Instant.now();
+
+    ImmutableList<SamzaResourceRequest> expectedOrder = ImmutableList.of(
+        createRequestForActive(now.minusSeconds(120)),
+        createRequestForActive(now),
+        createRequestForActive(now.plusSeconds(120)),
+        createRequestForActive(now.plusSeconds(240)),
+        createRequestForStandby(now.minusSeconds(120)),
+        createRequestForStandby(now),
+        createRequestForStandby(now.plusSeconds(120)),
+        createRequestForStandby(now.plusSeconds(240)));
+
+    SamzaResourceRequest[] copyExpectedOrder = new 
SamzaResourceRequest[expectedOrder.size()];
+    copyExpectedOrder = expectedOrder.toArray(copyExpectedOrder);
+    List<SamzaResourceRequest> shuffled = Arrays.asList(copyExpectedOrder);
+
+    Collections.shuffle(shuffled, new Random(Instant.now().toEpochMilli()));
+    pq.addAll(shuffled);
+
+    ArrayList priorityQueueOrder = new ArrayList();
+    for (int i = 0; i < expectedOrder.size(); ++i) {
+      priorityQueueOrder.add(pq.poll());
+    }
+    assertEquals(expectedOrder, priorityQueueOrder);
+  }
+
+  @Test
+  public void testDelayedQueueOrdering() {
+    ResourceRequestState.DelayedRequestQueue delayedRequestQueue = new 
ResourceRequestState.DelayedRequestQueue();
+    Instant now = Instant.now();
+
+    // Expected priority by request timestamp only, regardless of active or 
standby
+    ImmutableList<SamzaResourceRequest> expectedOrder = ImmutableList.of(
+        createRequestForActive(now),
+        createRequestForStandby(now.plusSeconds(60)),
+        createRequestForActive(now.plusSeconds(120)),
+        createRequestForStandby(now.plusSeconds(121)),
+        createRequestForActive(now.plusSeconds(240)),
+        createRequestForStandby(now.plusSeconds(241)));
+
+    SamzaResourceRequest[] copyExpectedOrder = new 
SamzaResourceRequest[expectedOrder.size()];
+    copyExpectedOrder = expectedOrder.toArray(copyExpectedOrder);
+    List<SamzaResourceRequest> shuffled = Arrays.asList(copyExpectedOrder);
+
+    Collections.shuffle(shuffled, new Random(Instant.now().toEpochMilli()));
+    delayedRequestQueue.addAll(shuffled);
+
+    ArrayList priorityQueueOrder = new ArrayList();
+    for (int i = 0; i < expectedOrder.size(); ++i) {
+      priorityQueueOrder.add(delayedRequestQueue.poll());
+    }
+    assertEquals(expectedOrder, priorityQueueOrder);
+  }
+
+  SamzaResourceRequest createRequestForActive(Instant requestTime) {
+    String randomHost = RandomStringUtils.randomAlphanumeric(4);
+    String randomId = RandomStringUtils.randomAlphanumeric(8);
+    return new SamzaResourceRequest(1, 1, randomHost, randomId, requestTime);
+  }
 
+  SamzaResourceRequest createRequestForStandby(Instant requestTime) {
+    String randomHost = RandomStringUtils.randomAlphanumeric(4);
+    String randomId = RandomStringUtils.randomAlphanumeric(8) + "-standby"; // 
hyphen in ID denotes a standby processor
+    return new SamzaResourceRequest(1, 1, randomHost, randomId, requestTime);
   }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index d9dfe45..40d292c 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -18,14 +18,14 @@
  */
 package org.apache.samza.clustermanager;
 
+import com.google.common.collect.ImmutableMap;
 import java.lang.reflect.Field;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.collect.ImmutableMap;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.LocalityManager;
@@ -249,10 +249,22 @@ public class TestHostAwareContainerAllocator {
     assertEquals(1, requestsMap.get(ResourceRequestState.ANY_HOST).get());
   }
 
+  @Test
+  public void testDelayedRequestedContainers() {
+    containerAllocator.requestResource("0", "abc");
+    containerAllocator.requestResourceWithDelay("0", "efg", 
Duration.ofHours(2));
+    containerAllocator.requestResourceWithDelay("0", "hij", 
Duration.ofHours(3));
+    containerAllocator.requestResourceWithDelay("0", "klm", 
Duration.ofHours(4));
+
+    assertNotNull(clusterResourceManager.resourceRequests);
+    assertEquals(clusterResourceManager.resourceRequests.size(), 1);
+    assertEquals(requestState.numPendingRequests(), 1);
+    assertEquals(requestState.numDelayedRequests(), 3);
+  }
+
   /**
    * Handles expired requests correctly and assigns ANY_HOST
    */
-
   @Test
   public void testExpiredRequestAreAssignedToAnyHost() throws Exception {
     final SamzaResource resource0 = new SamzaResource(1, 1000, "xyz", "id1");

Reply via email to