This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new e2b9a76  SEP-19: Allocator changes for standby-aware container 
allocation, and active container failover
e2b9a76 is described below

commit e2b9a76fcba7488a4457907d3915c9a3cbf7cf51
Author: Ray Matharu <[email protected]>
AuthorDate: Mon Mar 4 12:35:05 2019 -0800

    SEP-19: Allocator changes for standby-aware container allocation, and 
active container failover
    
    This PR makes the following changes:
    
    * Adds a map called standbyContainerConstraints which stores standby 
constraints for each container in the job model. The logic for populating the 
map using the job model is added as Util class.
    
    * Adds a check before runStreamProcessor, where the 
standbyContainerConstraints is checked before launching, if the check succeeds 
container is launched. If standbyTasks are not enabled in config, existing 
behaviour is retained.
    
    * Adds logic to handle standbyContainerConstraints check failures, which 
entails releasing the resource, and making an any-host request for the 
container.
    
    * Adds logic in SamzaResourceRequest to order container request such that 
active-containers take precedence over standby, otherwise based on 
request-timestamp.
    
    * Adds logic in HostAwareContainerAllocator to trap resourceRequests issued 
by the CPM and the HACA, and
    a. if it is an any-host for an active container,  translate it for a stop a 
standby.
    b. if it is for a standby stopped by us, translate it for a resource 
request for active on standby-host and standby on anyhost.
    c. in all other cases, proceeds asis.
    
    * Adds metrics to capture FailedStandbyAllocations (due to standby 
constraints), successful standby allocations, number of failovers of active to 
a valid standby, number of failovers of active to anyhost, in case no standby 
was found, and number of standby-container-stops completed.
    
    Tested on dev setup, a VPC, and a test cluster.
    
    Author: Ray Matharu <[email protected]>
    Author: rmatharu <[email protected]>
    
    Reviewers: Jagadish Venkatraman <[email protected]>
    
    Closes #903 from rmatharu/test-basicStandbyFailover
---
 .../clustermanager/AbstractContainerAllocator.java |  12 +-
 .../clustermanager/ClusterResourceManager.java     |   8 +
 .../clustermanager/ContainerProcessManager.java    |  47 +-
 .../HostAwareContainerAllocator.java               |  33 +-
 .../samza/clustermanager/ResourceRequestState.java |  24 +-
 .../clustermanager/SamzaApplicationState.java      |  23 +-
 .../samza/clustermanager/SamzaResourceRequest.java |  12 +-
 .../clustermanager/StandbyContainerManager.java    | 516 +++++++++++++++++++++
 .../samza/clustermanager/StandbyTaskUtil.java      | 121 +++++
 .../grouper/task/TaskNameGrouperProxy.java         |  21 +-
 .../apache/samza/storage/StorageManagerUtil.java   |   9 +-
 .../metrics/ContainerProcessManagerMetrics.scala   |   4 +
 .../clustermanager/MockClusterResourceManager.java |   5 +
 .../clustermanager/MockContainerRequestState.java  |   4 +-
 .../MockHostAwareContainerAllocator.java           |   3 +-
 .../TestContainerProcessManager.java               |   3 +-
 .../TestHostAwareContainerAllocator.java           |   3 +-
 .../samza/clustermanager/TestStandbyAllocator.java | 126 +++++
 .../samza/job/yarn/YarnClusterResourceManager.java |  26 +-
 19 files changed, 943 insertions(+), 57 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index 5547a32..7adb1cc 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -194,11 +194,19 @@ public abstract class AbstractContainerAllocator 
implements Runnable {
    * @param preferredHost Name of the host that you prefer to run the 
container on
    */
   public final void requestResource(String containerID, String preferredHost) {
-    SamzaResourceRequest request = new 
SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb,
+    SamzaResourceRequest request = getResourceRequest(containerID, 
preferredHost);
+    issueResourceRequest(request);
+  }
+
+  public final SamzaResourceRequest getResourceRequest(String containerID, 
String preferredHost) {
+    return new SamzaResourceRequest(this.containerNumCpuCores, 
this.containerMemoryMb,
         preferredHost, containerID);
+  }
+
+  public final void issueResourceRequest(SamzaResourceRequest request) {
     resourceRequestState.addResourceRequest(request);
     state.containerRequests.incrementAndGet();
-    if (ResourceRequestState.ANY_HOST.equals(preferredHost)) {
+    if (ResourceRequestState.ANY_HOST.equals(request.getPreferredHost())) {
       state.anyHostRequests.incrementAndGet();
     } else {
       state.preferredHostRequests.incrementAndGet();
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
index f8a8c8b..19f3ef0 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -121,6 +121,14 @@ public abstract class ClusterResourceManager {
    */
   public abstract void launchStreamProcessor(SamzaResource resource, 
CommandBuilder builder);
 
+  /**
+   * Requests the stopping of a StreamProcessor, identified by the given 
resource.
+   * {@link Callback#onResourcesCompleted(List)} will be invoked to indicate 
the completion of this operation.
+   *
+   * @param resource the resource being used for the StreamProcessor.
+   */
+  public abstract void stopStreamProcessor(SamzaResource resource);
+
 
   public abstract void stop(SamzaApplicationState.SamzaAppStatus status);
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index a089ed9..e63b425 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.Optional;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
@@ -77,6 +78,9 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
   private final AbstractContainerAllocator containerAllocator;
   private final Thread allocatorThread;
 
+  // The StandbyContainerManager manages standby-aware allocation and failover 
of containers
+  private final Optional<StandbyContainerManager> standbyContainerManager;
+
   /**
    * A standard interface to request resources.
    */
@@ -115,6 +119,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
     this.containerAllocator = allocator;
     this.allocatorThread = new Thread(this.containerAllocator, "Container 
Allocator Thread");
+    this.standbyContainerManager = Optional.empty();
   }
 
   public ContainerProcessManager(Config config,
@@ -130,8 +135,14 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     this.clusterResourceManager = 
checkNotNull(factory.getClusterResourceManager(this, state));
     this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
 
+    if (jobConfig.getStandbyTasksEnabled()) {
+      this.standbyContainerManager = Optional.of(new 
StandbyContainerManager(state, clusterResourceManager));
+    } else {
+      this.standbyContainerManager = Optional.empty();
+    }
+
     if (this.hostAffinityEnabled) {
-      this.containerAllocator = new 
HostAwareContainerAllocator(clusterResourceManager, 
clusterManagerConfig.getContainerRequestTimeout(), config, state);
+      this.containerAllocator = new 
HostAwareContainerAllocator(clusterResourceManager, 
clusterManagerConfig.getContainerRequestTimeout(), config, 
standbyContainerManager, state);
     } else {
       this.containerAllocator = new ContainerAllocator(clusterResourceManager, 
config, state);
     }
@@ -155,10 +166,10 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
 
     this.clusterResourceManager = resourceManager;
     this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
-
+    this.standbyContainerManager = Optional.empty();
 
     if (this.hostAffinityEnabled) {
-      this.containerAllocator = new 
HostAwareContainerAllocator(clusterResourceManager, 
clusterManagerConfig.getContainerRequestTimeout(), config, state);
+      this.containerAllocator = new 
HostAwareContainerAllocator(clusterResourceManager, 
clusterManagerConfig.getContainerRequestTimeout(), config, 
this.standbyContainerManager, state);
     } else {
       this.containerAllocator = new ContainerAllocator(clusterResourceManager, 
config, state);
     }
@@ -185,14 +196,12 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     clusterResourceManager.start();
 
     log.info("Starting the Samza task manager");
-    final int containerCount = jobConfig.getContainerCount();
 
-    state.containerCount.set(containerCount);
-    state.neededContainers.set(containerCount);
+    
state.containerCount.set(state.jobModelManager.jobModel().getContainers().size());
+    
state.neededContainers.set(state.jobModelManager.jobModel().getContainers().size());
 
     // Request initial set of containers
     Map<String, String> containerToHostMapping = 
state.jobModelManager.jobModel().getAllContainerLocality();
-
     containerAllocator.requestResources(containerToHostMapping);
 
     // Start container allocator thread
@@ -297,8 +306,8 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
         state.neededContainers.incrementAndGet();
         state.jobHealthy.set(false);
 
-          // request a container on new host
-        containerAllocator.requestResource(containerId, 
ResourceRequestState.ANY_HOST);
+        // handle container stop due to node fail
+        this.handleContainerStop(containerId, containerStatus.getResourceID(), 
ResourceRequestState.ANY_HOST, exitStatus);
         break;
 
       default:
@@ -371,9 +380,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
         }
 
         if (!tooManyFailedContainers) {
-          log.info("Requesting a new container ");
-          // Request a new container
-          containerAllocator.requestResource(containerId, lastSeenOn);
+          handleContainerStop(containerId, containerStatus.getResourceID(), 
lastSeenOn, exitStatus);
         }
 
     }
@@ -428,8 +435,11 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     String containerId = getPendingContainerId(resource.getResourceID());
     log.info("Failed container ID: {} for resourceId: {}", containerId, 
resource.getResourceID());
 
-    // 3. Re-request resources on ANY_HOST in case of launch failures on the 
preferred host.
-    if (containerId != null) {
+    // 3. Re-request resources on ANY_HOST in case of launch failures on the 
preferred host, if standby are not enabled
+    // otherwise calling standbyContainerManager
+    if (containerId != null && standbyContainerManager.isPresent()) {
+      
this.standbyContainerManager.get().handleContainerLaunchFail(containerId, 
resource.getResourceID(), containerAllocator);
+    } else if (containerId != null) {
       log.info("Launch of container ID: {} failed on host: {}. Falling back to 
ANY_HOST", containerId, resource.getHost());
       containerAllocator.requestResource(containerId, 
ResourceRequestState.ANY_HOST);
     } else {
@@ -484,5 +494,12 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     return null;
   }
 
-
+  private void handleContainerStop(String containerID, String resourceID, 
String preferredHost, int exitStatus) {
+    if (standbyContainerManager.isPresent()) {
+      standbyContainerManager.get().handleContainerStop(containerID, 
resourceID, preferredHost, exitStatus, containerAllocator);
+    } else {
+      // If StandbyTasks are not enabled, we simply make a request for the 
preferredHost
+      containerAllocator.requestResource(containerID, preferredHost);
+    }
+  }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index 27d1caa..6bfa1a6 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -18,11 +18,14 @@
  */
 package org.apache.samza.clustermanager;
 
+
+import java.util.Optional;
 import java.util.Map;
 import org.apache.samza.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * This is the allocator thread that will be used by ContainerProcessManager 
when host-affinity is enabled for a job. It is similar
  * to {@link ContainerAllocator}, except that it considers locality for 
allocation.
@@ -45,11 +48,13 @@ public class HostAwareContainerAllocator extends 
AbstractContainerAllocator {
    * Tracks the expiration of a request for resources.
    */
   private final int requestTimeout;
+  private final Optional<StandbyContainerManager> standbyContainerManager;
 
   public HostAwareContainerAllocator(ClusterResourceManager manager,
-                                     int timeout, Config config, 
SamzaApplicationState state) {
+      int timeout, Config config, Optional<StandbyContainerManager> 
standbyContainerManager, SamzaApplicationState state) {
     super(manager, new ResourceRequestState(true, manager), config, state);
     this.requestTimeout = timeout;
+    this.standbyContainerManager = standbyContainerManager;
   }
 
   /**
@@ -70,7 +75,8 @@ public class HostAwareContainerAllocator extends 
AbstractContainerAllocator {
       if (hasAllocatedResource(preferredHost)) {
         // Found allocated container at preferredHost
         log.info("Found a matched-container {} on the preferred host. Running 
on {}", containerID, preferredHost);
-        runStreamProcessor(request, preferredHost);
+        // Try to launch streamProcessor on this preferredHost if it all 
standby constraints are met
+        checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost, 
peekAllocatedResource(preferredHost));
         state.matchedResourceRequests.incrementAndGet();
       } else {
         log.info("Did not find any allocated resources on preferred host {} 
for running container id {}",
@@ -81,14 +87,21 @@ public class HostAwareContainerAllocator extends 
AbstractContainerAllocator {
 
         if (expired) {
           updateExpiryMetrics(request);
-          if (resourceAvailableOnAnyHost) {
+
+          if (standbyContainerManager.isPresent()) {
+            
standbyContainerManager.get().handleExpiredResourceRequest(containerID, request,
+                
Optional.ofNullable(peekAllocatedResource(ResourceRequestState.ANY_HOST)), 
this, resourceRequestState);
+
+          } else if (resourceAvailableOnAnyHost) {
             log.info("Request for container: {} on {} has expired. Running on 
ANY_HOST", request.getContainerID(), request.getPreferredHost());
             runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+
           } else {
             log.info("Request for container: {} on {} has expired. Requesting 
additional resources on ANY_HOST.", request.getContainerID(), 
request.getPreferredHost());
             resourceRequestState.cancelResourceRequest(request);
             requestResource(containerID, ResourceRequestState.ANY_HOST);
           }
+
         } else {
           log.info("Request for container: {} on {} has not yet expired. 
Request creation time: {}. Request timeout: {}",
               new Object[]{request.getContainerID(), 
request.getPreferredHost(), request.getRequestTimestampMs(), requestTimeout});
@@ -98,6 +111,7 @@ public class HostAwareContainerAllocator extends 
AbstractContainerAllocator {
     }
   }
 
+
   /**
    * Since host-affinity is enabled, all container processes will be requested 
on their preferred host. If the job is
    * run for the first time, it will get matched to any available host.
@@ -142,4 +156,15 @@ public class HostAwareContainerAllocator extends 
AbstractContainerAllocator {
       state.expiredPreferredHostRequests.incrementAndGet();
     }
   }
-}
+
+  private void 
checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest request, 
String preferredHost, SamzaResource samzaResource) {
+    // If standby tasks are not enabled run streamprocessor on the given host
+    if (!this.standbyContainerManager.isPresent()) {
+      runStreamProcessor(request, preferredHost);
+      return;
+    }
+
+    
this.standbyContainerManager.get().checkStandbyConstraintsAndRunStreamProcessor(request,
 preferredHost,
+        samzaResource, this, resourceRequestState);
+  }
+}
\ No newline at end of file
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index aa7a509..0296d4c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -152,8 +152,8 @@ public class ResourceRequestState {
                * assigned to ANY_HOST
                */
               log.info("The number of containers already allocated on {} is 
greater than what was " +
-                              "requested, which is {}. Hence, saving the 
samzaResource {} in the buffer for ANY_HOST",
-                      new Object[]{hostName, requestCountOnThisHost, 
samzaResource.getResourceID()});
+                      "requested, which is {}. Hence, saving the samzaResource 
{} in the buffer for ANY_HOST",
+                  new Object[]{hostName, requestCountOnThisHost, 
samzaResource.getResourceID()});
               addToAllocatedResourceList(ANY_HOST, samzaResource);
             }
           }
@@ -238,9 +238,21 @@ public class ResourceRequestState {
    *
    * @param resource the {@link SamzaResource} to release.
    */
-  public void releaseUnstartableContainer(SamzaResource resource) {
-    log.info("Releasing unstartable container {}", resource.getResourceID());
-    manager.releaseResources(resource);
+  public void releaseUnstartableContainer(SamzaResource resource, String 
preferredHost) {
+    synchronized (lock) {
+      log.info("Releasing unstartable container {} on host {}", 
resource.getResourceID(), resource.getHost());
+      manager.releaseResources(resource);
+
+      // A reference for the resource could either be held in the preferred 
host buffer or in the ANY_HOST buffer.
+      if (allocatedResources.get(preferredHost) != null) {
+        allocatedResources.get(preferredHost).remove(resource);
+        log.info("Resource {} removed from allocated resource buffer for host 
{}", resource.getResourceID(), preferredHost);
+      }
+      if (allocatedResources.get(ANY_HOST) != null) {
+        allocatedResources.get(ANY_HOST).remove(resource);
+        log.info("Resource {} removed from allocated resource buffer for host 
{}", resource.getResourceID(), ANY_HOST);
+      }
+    }
   }
 
 
@@ -366,4 +378,4 @@ public class ResourceRequestState {
   }
 
 
-}
+}
\ No newline at end of file
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index 4e6fc33..2a7b6a1 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -78,7 +78,7 @@ public class SamzaApplicationState {
   public final AtomicInteger releasedContainers = new AtomicInteger(0);
 
   /**
-   * ContainerStatus of failed containers.
+   * ContainerStatuses of failed containers.
    */
   public final ConcurrentMap<String, SamzaResourceStatus> 
failedContainersStatus = new ConcurrentHashMap<String, SamzaResourceStatus>();
 
@@ -110,7 +110,7 @@ public class SamzaApplicationState {
    */
   public final ConcurrentMap<String, SamzaResource> runningContainers = new 
ConcurrentHashMap<String, SamzaResource>(0);
 
-  /**
+   /**
    * Final status of the application. Made to be volatile s.t. changes will be 
visible in multiple threads.
    */
   public volatile SamzaAppStatus status = SamzaAppStatus.UNDEFINED;
@@ -141,6 +141,25 @@ public class SamzaApplicationState {
    */
   public final AtomicInteger redundantNotifications = new AtomicInteger(0);
 
+  /**
+   * Number of container allocations from the RM, that did not meet standby 
container constraints, in which case the
+   * existing resource was given back to the RM, and a new ANY-HOST request 
had to be made.
+   */
+  public final AtomicInteger failedStandbyAllocations = new AtomicInteger(0);
+
+  /**
+   * Number of occurrences in which a failover of an active container was 
initiated (due to a node failure), in which a
+   * running standby container was available for the failover.
+   * If two standby containers were used for one failing active, it counts as 
two.
+   */
+  public final AtomicInteger failoversToStandby = new AtomicInteger(0);
+
+  /**
+   * Number of occurrences in which a failover of an active container was 
initiated (due to a node failure), in which no
+   * running standby container was available for the failover.
+  */
+  public final AtomicInteger failoversToAnyHost = new AtomicInteger(0);
+
   public SamzaApplicationState(JobModelManager jobModelManager) {
     this.jobModelManager = jobModelManager;
   }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
index 4159ff2..df70459 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
@@ -109,11 +109,21 @@ public class SamzaResourceRequest implements 
Comparable<SamzaResourceRequest> {
   }
 
   /**
-   * Requests are ordered by the time at which they were created.
+   * Requests are ordered by the container type and the time at which they 
were created.
+   * Active containers take precedence over standby containers, regardless of 
timestamp.
    * @param o the other
    */
   @Override
   public int compareTo(SamzaResourceRequest o) {
+
+    if (!StandbyTaskUtil.isStandbyContainer(this.containerID) && 
StandbyTaskUtil.isStandbyContainer(o.containerID)) {
+      return -1;
+    }
+
+    if (StandbyTaskUtil.isStandbyContainer(this.containerID) && 
!StandbyTaskUtil.isStandbyContainer(o.containerID)) {
+      return 1;
+    }
+
     if (this.requestTimestampMs < o.requestTimestampMs)
       return -1;
     if (this.requestTimestampMs > o.requestTimestampMs)
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
new file mode 100644
index 0000000..db27238
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.storage.kv.Entry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Encapsulates logic and state concerning standby-containers.
+ */
+public class StandbyContainerManager {
+
+  private static final Logger log = 
LoggerFactory.getLogger(StandbyContainerManager.class);
+
+  private final SamzaApplicationState samzaApplicationState;
+
+  // Map of samza containerIDs to their corresponding active and standby 
containers, e.g., 0 -> {0-0, 0-1}, 0-0 -> {0, 0-1}
+  // This is used for checking no two standbys or active-standby-pair are 
started on the same host
+  private final Map<String, List<String>> standbyContainerConstraints;
+
+  // Map of active containers that are in failover, indexed by the active 
container's resourceID (at the time of failure)
+  private final Map<String, FailoverMetadata> failovers;
+
+  // Resource-manager, used to stop containers
+  private ClusterResourceManager clusterResourceManager;
+
+  public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
+      ClusterResourceManager clusterResourceManager) {
+    this.failovers = new ConcurrentHashMap<>();
+    this.standbyContainerConstraints = new HashMap<>();
+    this.samzaApplicationState = samzaApplicationState;
+    JobModel jobModel = samzaApplicationState.jobModelManager.jobModel();
+
+    // populate the standbyContainerConstraints map by iterating over all 
containers
+    jobModel.getContainers()
+        .keySet()
+        .forEach(containerId -> standbyContainerConstraints.put(containerId,
+            StandbyTaskUtil.getStandbyContainerConstraints(containerId, 
jobModel)));
+    this.clusterResourceManager = clusterResourceManager;
+
+    log.info("Populated standbyContainerConstraints map {}", 
standbyContainerConstraints);
+  }
+
+  /**
+   * We handle the stopping of a container depending on the case which is 
decided using the exit-status:
+   *    Case 1. an Active-Container which has stopped for an "unknown" reason, 
then we start it on the given preferredHost (but we record the resource-request)
+   *    Case 2. Active container has stopped because of node failure, thene we 
initiate a failover
+   *    Case 3. StandbyContainer has stopped after it was chosen for failover, 
see {@link StandbyContainerManager#handleStandbyContainerStop}
+   *    Case 4. StandbyContainer has stopped but not because of a failover, 
see {@link StandbyContainerManager#handleStandbyContainerStop}
+   *
+   * @param containerID containerID of the stopped container
+   * @param resourceID last resourceID of the stopped container
+   * @param preferredHost the host on which the container was running
+   * @param exitStatus the exit status of the failed container
+   * @param containerAllocator the container allocator
+   */
+  public void handleContainerStop(String containerID, String resourceID, 
String preferredHost, int exitStatus,
+      AbstractContainerAllocator containerAllocator) {
+
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      handleStandbyContainerStop(containerID, resourceID, preferredHost, 
containerAllocator);
+    } else {
+      // initiate failover for the active container based on the exitStatus
+      switch (exitStatus) {
+        case SamzaResourceStatus.DISK_FAIL:
+        case SamzaResourceStatus.ABORTED:
+        case SamzaResourceStatus.PREEMPTED:
+          initiateActiveContainerFailover(containerID, resourceID, 
containerAllocator);
+          break;
+      // in all other cases, request-resource for the failed container, but 
record the resource-request, so that
+      // if this request expires, we can do a failover -- select a standby to 
stop & start the active on standby's host
+        default:
+          log.info("Requesting resource for active-container {} on host {}", 
containerID, preferredHost);
+          SamzaResourceRequest resourceRequest = 
containerAllocator.getResourceRequest(containerID, preferredHost);
+          FailoverMetadata failoverMetadata = 
registerActiveContainerFailure(containerID, resourceID);
+          failoverMetadata.recordResourceRequest(resourceRequest);
+          containerAllocator.issueResourceRequest(resourceRequest);
+          break;
+      }
+    }
+  }
+
+  /**
+   * Handle the failed launch of a container, based on
+   *    Case 1. If it is an active container, then initiate a failover for it.
+   *    Case 2. If it is standby container, request a new resource on AnyHost.
+   * @param containerID the ID of the container that has failed
+   */
+  public void handleContainerLaunchFail(String containerID, String resourceID,
+      AbstractContainerAllocator containerAllocator) {
+
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      log.info("Handling launch fail for standby-container {}, requesting 
resource on any host {}", containerID);
+      containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
+    } else {
+      initiateActiveContainerFailover(containerID, resourceID, 
containerAllocator);
+    }
+  }
+
+  /**
+   *  If a standby container has stopped, then there are two possible cases
+   *    Case 1. during a failover, the standby container was stopped for an 
active's start, then we
+   *       1. request a resource on the standby's host to place the 
activeContainer, and
+   *       2. request anyhost to place this standby
+   *
+   *    Case 2. independent of a failover, the standby container stopped, in 
which proceed with its resource-request
+   * @param standbyContainerID SamzaContainerID of the standby container
+   * @param preferredHost Preferred host of the standby container
+   */
+  private void handleStandbyContainerStop(String standbyContainerID, String 
resourceID, String preferredHost,
+      AbstractContainerAllocator containerAllocator) {
+
+    // if this standbyContainerResource was stopped for a failover, we will 
find a metadata entry
+    Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = 
this.checkIfUsedForFailover(resourceID);
+
+    if (failoverMetadata.isPresent()) {
+      String activeContainerID = failoverMetadata.get().activeContainerID;
+      String standbyContainerHostname = 
failoverMetadata.get().getStandbyContainerHostname(resourceID);
+
+      log.info("Requesting resource for active container {} on host {}, and 
backup container {} on any host",
+          activeContainerID, standbyContainerHostname, standbyContainerID);
+
+      // request standbycontainer's host for active-container
+      SamzaResourceRequest resourceRequestForActive = 
containerAllocator.getResourceRequest(activeContainerID, 
standbyContainerHostname);
+      // record the resource request, before issuing it to avoid race with 
allocation-thread
+      failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
+      containerAllocator.issueResourceRequest(resourceRequestForActive);
+
+      // request any-host for standby container
+      containerAllocator.requestResource(standbyContainerID, 
ResourceRequestState.ANY_HOST);
+    } else {
+      log.info("Issuing request for standby container {} on host {}, since 
this is not for a failover",
+          standbyContainerID, preferredHost);
+      containerAllocator.requestResource(standbyContainerID, preferredHost);
+    }
+  }
+
+  /** Method to handle failover for an active container.
+   *  We try to find a standby for the active container, and issue a stop on 
it.
+   *  If we do not find a standby container, we simply issue an anyhost 
request to place it.
+   *
+   * @param containerID the samzaContainerID of the active-container
+   * @param resourceID  the samza-resource-ID of the container when it failed 
(used to index failover-state)
+   */
+  private void initiateActiveContainerFailover(String containerID, String 
resourceID,
+      AbstractContainerAllocator containerAllocator) {
+
+    Optional<Entry<String, SamzaResource>> standbyContainer = 
this.selectStandby(containerID, resourceID);
+
+    // If we find a standbyContainer, we initiate a failover
+    if (standbyContainer.isPresent()) {
+
+      String standbyContainerId = standbyContainer.get().getKey();
+      SamzaResource standbyResource = standbyContainer.get().getValue();
+      String standbyResourceID = standbyResource.getResourceID();
+      String standbyHost = standbyResource.getHost();
+
+      // update the state
+      FailoverMetadata failoverMetadata = 
this.registerActiveContainerFailure(containerID, resourceID);
+      failoverMetadata.updateStandbyContainer(standbyResourceID, standbyHost);
+
+      log.info("Initiating failover and stopping standby container, found 
standbyContainer {} = resource {}, "
+          + "for active container {}", standbyContainerId, standbyResourceID, 
containerID);
+      samzaApplicationState.failoversToStandby.incrementAndGet();
+      this.clusterResourceManager.stopStreamProcessor(standbyResource);
+
+    } else {
+
+      // If we dont find a standbyContainer, we proceed with the ANYHOST 
request
+      log.info("No standby container found for active container {}, making a 
request for {}", containerID,
+          ResourceRequestState.ANY_HOST);
+      samzaApplicationState.failoversToAnyHost.incrementAndGet();
+      containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
+    }
+  }
+
+  /**
+   * Method to select a standby container for a given active container that 
has stopped.
+   * TODO: enrich this method to select standby's intelligently based on lag, 
timestamp, load-balencing, etc.
+   * @param activeContainerID Samza containerID of the active container
+   * @param activeContainerResourceID ResourceID of the active container at 
the time of its last failure
+   * @return
+   */
+  private Optional<Entry<String, SamzaResource>> selectStandby(String 
activeContainerID,
+      String activeContainerResourceID) {
+
+    log.info("Standby containers {} for active container {}", 
this.standbyContainerConstraints.get(activeContainerID), activeContainerID);
+
+    // obtain any existing failover metadata
+    Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata =
+        activeContainerResourceID == null ? Optional.empty() : 
this.getFailoverMetadata(activeContainerResourceID);
+
+    // Iterate over the list of running standby containers, to find a standby 
resource that we have not already
+    // used for a failover for this active resoruce
+    for (String standbyContainerID : 
this.standbyContainerConstraints.get(activeContainerID)) {
+
+      if 
(samzaApplicationState.runningContainers.containsKey(standbyContainerID)) {
+        SamzaResource standbyContainerResource = 
samzaApplicationState.runningContainers.get(standbyContainerID);
+
+        // use this standby if there was no previous failover for which this 
standbyResource was used
+        if (!(failoverMetadata.isPresent() && 
failoverMetadata.get().isStandbyResourceUsed(standbyContainerResource.getResourceID())))
 {
+
+          log.info("Returning standby container {} in running state for active 
container {}", standbyContainerID,
+              activeContainerID);
+          return Optional.of(new Entry<>(standbyContainerID, 
standbyContainerResource));
+        }
+      }
+    }
+
+    log.info("Did not find any running standby container for active container 
{}", activeContainerID);
+    return Optional.empty();
+  }
+
+  /**
+   * Register the failure of an active container (identified by its resource 
ID).
+   */
+  private FailoverMetadata registerActiveContainerFailure(String 
activeContainerID, String activeContainerResourceID) {
+
+    // this active container's resource ID is already registered, in which 
case update the metadata
+    FailoverMetadata failoverMetadata;
+    if (failovers.containsKey(activeContainerResourceID)) {
+      failoverMetadata = failovers.get(activeContainerResourceID);
+    } else {
+      failoverMetadata = new FailoverMetadata(activeContainerID, 
activeContainerResourceID);
+    }
+    this.failovers.put(activeContainerResourceID, failoverMetadata);
+    return failoverMetadata;
+  }
+
+  /**
+   * Check if this standbyContainerResource is present in the failoverState 
for an active container.
+   * This is used to determine if we requested a stop a container.
+   */
+  private Optional<FailoverMetadata> checkIfUsedForFailover(String 
standbyContainerResourceId) {
+
+    if (standbyContainerResourceId == null) {
+      return Optional.empty();
+    }
+
+    for (FailoverMetadata failoverMetadata : failovers.values()) {
+      if (failoverMetadata.isStandbyResourceUsed(standbyContainerResourceId)) {
+        log.info("Standby container with resource id {} was selected for 
failover of active container {}",
+            standbyContainerResourceId, failoverMetadata.activeContainerID);
+        return Optional.of(failoverMetadata);
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * Check if matching this SamzaResourceRequest to the given resource, meets 
all standby-container container constraints.
+   *
+   * @param request The resource request to match.
+   * @param samzaResource The samzaResource to potentially match the resource 
to.
+   * @return
+   */
+  private boolean checkStandbyConstraints(SamzaResourceRequest request, 
SamzaResource samzaResource) {
+    String containerIDToStart = request.getContainerID();
+    String host = samzaResource.getHost();
+    List<String> containerIDsForStandbyConstraints = 
this.standbyContainerConstraints.get(containerIDToStart);
+
+    // Check if any of these conflicting containers are running/launching on 
host
+    for (String containerID : containerIDsForStandbyConstraints) {
+      SamzaResource resource = 
samzaApplicationState.pendingContainers.get(containerID);
+
+      // return false if a conflicting container is pending for launch on the 
host
+      if (resource != null && resource.getHost().equals(host)) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
+            containerIDToStart, samzaResource.getHost(), containerID);
+        return false;
+      }
+
+      // return false if a conflicting container is running on the host
+      resource = samzaApplicationState.runningContainers.get(containerID);
+      if (resource != null && resource.getHost().equals(host)) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already running on this host",
+            containerIDToStart, samzaResource.getHost(), containerID);
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   *  Attempt to the run a container on the given candidate resource, if doing 
so meets the standby container constraints.
+   * @param request The Samza container request
+   * @param preferredHost the preferred host associated with the container
+   * @param samzaResource the resource candidate
+   */
+  public void 
checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest request, 
String preferredHost,
+      SamzaResource samzaResource, AbstractContainerAllocator 
containerAllocator,
+      ResourceRequestState resourceRequestState) {
+    String containerID = request.getContainerID();
+
+    if (checkStandbyConstraints(request, samzaResource)) {
+      // This resource can be used to launch this container
+      log.info("Running container {} on {} meets standby constraints, 
preferredHost = {}", containerID, samzaResource.getHost(), preferredHost);
+      containerAllocator.runStreamProcessor(request, preferredHost);
+    } else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      // This resource cannot be used to launch this standby container, so we 
make a new anyhost request
+      log.info("Running standby container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource, and making a new 
ANY_HOST request",
+          containerID, samzaResource.getHost());
+      resourceRequestState.releaseUnstartableContainer(samzaResource, 
preferredHost);
+      resourceRequestState.cancelResourceRequest(request);
+      containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
+      samzaApplicationState.failedStandbyAllocations.incrementAndGet();
+    } else {
+      // This resource cannot be used to launch this active container 
container, so we initiate a failover
+      log.warn("Running active container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource",
+          containerID, samzaResource.getHost());
+      resourceRequestState.releaseUnstartableContainer(samzaResource, 
preferredHost);
+      resourceRequestState.cancelResourceRequest(request);
+
+      Optional<FailoverMetadata> failoverMetadata = 
getFailoverMetadata(request);
+
+      // if this active-container has never failed, then simple request anyhost
+      if (!failoverMetadata.isPresent()) {
+        log.info("Requesting ANY_HOST for active container {}", containerID);
+        containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
+      } else {
+        log.info("Initiating failover for active container {}", containerID);
+        // we use the activeContainer's last resourceID to initiate the 
failover
+        String lastKnownResourceID = 
failoverMetadata.get().activeContainerResourceID;
+        initiateActiveContainerFailover(containerID, lastKnownResourceID, 
containerAllocator);
+      }
+
+      samzaApplicationState.failedStandbyAllocations.incrementAndGet();
+    }
+  }
+
+  /**
+   * Handle an expired resource request
+   * @param containerID the containerID for which the resource request was made
+   * @param request the expired resource request
+   * @param alternativeResource an alternative, already-allocated, resource 
(if available)
+   * @param containerAllocator the container allocator (used to issue any 
required subsequent resource requests)
+   * @param resourceRequestState used to cancel resource requests if required.
+   */
+  public void handleExpiredResourceRequest(String containerID, 
SamzaResourceRequest request,
+      Optional<SamzaResource> alternativeResource, AbstractContainerAllocator 
containerAllocator,
+      ResourceRequestState resourceRequestState) {
+
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      handleExpiredRequestForStandbyContainer(containerID, request, 
alternativeResource, containerAllocator, resourceRequestState);
+    } else {
+      handleExpiredRequestForActiveContainer(containerID, request, 
alternativeResource, containerAllocator, resourceRequestState);
+    }
+  }
+
+  // Handle an expired resource request that was made for placing a standby 
container
+  private void handleExpiredRequestForStandbyContainer(String containerID, 
SamzaResourceRequest request,
+      Optional<SamzaResource> alternativeResource, AbstractContainerAllocator 
containerAllocator,
+      ResourceRequestState resourceRequestState) {
+
+    if (alternativeResource.isPresent()) {
+      // A standby container can be started on the 
anyhost-alternative-resource rightaway provided it passes all the
+      // standby constraints
+      log.info("Handling expired request, standby container {} can be started 
on alternative resource {}", containerID, alternativeResource.get());
+      checkStandbyConstraintsAndRunStreamProcessor(request, 
ResourceRequestState.ANY_HOST, alternativeResource.get(),
+          containerAllocator, resourceRequestState);
+
+    } else {
+      // If there is no alternative-resource for the standby container we make 
a new anyhost request
+      log.info("Handling expired request, requesting anyHost resource for 
standby container {}", containerID);
+      resourceRequestState.cancelResourceRequest(request);
+      containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
+    }
+  }
+
+  // Handle an expired resource request that was made for placing an active 
container
+  private void handleExpiredRequestForActiveContainer(String containerID, 
SamzaResourceRequest request,
+      Optional<SamzaResource> alternativeResource, AbstractContainerAllocator 
containerAllocator,
+      ResourceRequestState resourceRequestState) {
+
+    Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request);
+
+    // An active container can be started on the alternative-any-host resource 
rightaway, if it has no prior failure,
+    // that is, there is no failoverMetadata associated with this 
resource-request
+    if (alternativeResource.isPresent() && !failoverMetadata.isPresent()) {
+
+      log.info("Handling expired request, trying to run active container {} on 
alternative resource {}", containerID, alternativeResource.get());
+
+      checkStandbyConstraintsAndRunStreamProcessor(request, 
ResourceRequestState.ANY_HOST, alternativeResource.get(),
+          containerAllocator, resourceRequestState);
+
+    } else if (!failoverMetadata.isPresent() && 
!alternativeResource.isPresent()) {
+    // An active container has no prior failure, and there is 
no-alternative-anyhost resource, so we make a new anyhost request
+      log.info("Handling expired request, requesting anyHost resource for 
active container {} because this active container has never failed", 
containerID);
+
+      resourceRequestState.cancelResourceRequest(request);
+      containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
+
+    } else if (failoverMetadata.isPresent()) {
+      // An active container that had failed, and whose subsequent resource 
request has expired, needs to be failed over to
+      // a new standby-candidate, so we initiate a failover
+
+      log.info("Handling expired request, initiating failover for active 
container {}", containerID);
+
+      resourceRequestState.cancelResourceRequest(request);
+
+      // we use the activeContainer's resourceID to initiate the failover
+      String lastKnownResourceID = 
failoverMetadata.get().activeContainerResourceID;
+      initiateActiveContainerFailover(containerID, lastKnownResourceID, 
containerAllocator);
+
+    } else {
+      log.error("Handling expired request, invalid state containerID {}, 
resource request {}", containerID, request);
+    }
+  }
+
+
+  /**
+   * Check if a activeContainerResource has failover-metadata associated with 
it
+   */
+  private Optional<FailoverMetadata> getFailoverMetadata(String 
activeContainerResourceID) {
+    return this.failovers.containsKey(activeContainerResourceID) ? Optional.of(
+        this.failovers.get(activeContainerResourceID)) : Optional.empty();
+  }
+
+  /**
+   * Check if a SamzaResourceRequest was issued for a failover.
+   */
+  private Optional<FailoverMetadata> getFailoverMetadata(SamzaResourceRequest 
resourceRequest) {
+    for (FailoverMetadata failoverMetadata : this.failovers.values()) {
+      if (failoverMetadata.containsResourceRequest(resourceRequest)) {
+        return Optional.of(failoverMetadata);
+      }
+    }
+    return Optional.empty();
+  }
+
+
+  @Override
+  public String toString() {
+    return this.failovers.toString();
+  }
+
+  /**
+   * Encapsulates metadata concerning the failover of an active container.
+   */
+  public class FailoverMetadata {
+    public final String activeContainerID;
+    public final String activeContainerResourceID;
+
+    // Map of samza-container-resource ID to host, for each standby container 
selected for failover of the activeContainer
+    private final Map<String, String> selectedStandbyContainers;
+
+    // Resource requests issued during this failover
+    private final Set<SamzaResourceRequest> resourceRequests;
+
+    public FailoverMetadata(String activeContainerID, String 
activeContainerResourceID) {
+      this.activeContainerID = activeContainerID;
+      this.activeContainerResourceID = activeContainerResourceID;
+      this.selectedStandbyContainers = new HashMap<>();
+      resourceRequests = new HashSet<>();
+    }
+
+    // Check if this standbyContainerResourceID was used in this failover
+    public synchronized boolean isStandbyResourceUsed(String 
standbyContainerResourceID) {
+      return 
this.selectedStandbyContainers.keySet().contains(standbyContainerResourceID);
+    }
+
+    // Get the hostname corresponding to the standby resourceID
+    public synchronized String getStandbyContainerHostname(String 
standbyContainerResourceID) {
+      return selectedStandbyContainers.get(standbyContainerResourceID);
+    }
+
+    // Add the standbyContainer resource to the list of standbyContainers used 
in this failover
+    public synchronized void updateStandbyContainer(String 
standbyContainerResourceID, String standbyContainerHost) {
+      this.selectedStandbyContainers.put(standbyContainerResourceID, 
standbyContainerHost);
+    }
+
+    // Add the samzaResourceRequest to the list of resource requests 
associated with this failover
+    public synchronized void recordResourceRequest(SamzaResourceRequest 
samzaResourceRequest) {
+      this.resourceRequests.add(samzaResourceRequest);
+    }
+
+    // Check if this resource request is used for this failover
+    public synchronized boolean containsResourceRequest(SamzaResourceRequest 
samzaResourceRequest) {
+      return this.resourceRequests.contains(samzaResourceRequest);
+    }
+
+    @Override
+    public String toString() {
+      return "[activeContainerID: " + this.activeContainerID + " 
activeContainerResourceID: "
+          + this.activeContainerResourceID + " selectedStandbyContainers:" + 
selectedStandbyContainers
+          + " resourceRequests: " + resourceRequests + "]";
+    }
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyTaskUtil.java 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyTaskUtil.java
new file mode 100644
index 0000000..c993445
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyTaskUtil.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+
+
+/**
+ * Collection of util methods used for performing Standby-aware Container 
allocation in YARN.
+ */
+public class StandbyTaskUtil {
+  private static final String STANDBY_CONTAINER_ID_SEPARATOR = "-";
+  private static final String TASKNAME_SEPARATOR = "-";
+  private static final String STANDBY_TASKNAME_PREFIX = "Standby";
+
+  /**
+   * Returns true if the containerName implies a standby container, false 
otherwise.
+   * @param containerID The desired containerID
+   * @return
+   */
+  public static boolean isStandbyContainer(String containerID) {
+    return containerID.contains(STANDBY_CONTAINER_ID_SEPARATOR);
+  }
+
+  // Helper method to generate buddy containerIDs by appending the 
replica-number to the active-container's id.
+  public final static String getStandbyContainerId(String activeContainerId, 
int replicaNumber) {
+    return 
activeContainerId.concat(STANDBY_CONTAINER_ID_SEPARATOR).concat(String.valueOf(replicaNumber));
+  }
+
+  // Helper method to generate active container's ID by removing the 
replica-number from the standby container's id.
+  public final static String getActiveContainerId(String standbyContainerID) {
+    return standbyContainerID.split(STANDBY_CONTAINER_ID_SEPARATOR)[0];
+  }
+
+  // Helper method to get the standby task name by prefixing "Standby" to the 
corresponding active task's name.
+  public final static TaskName getStandbyTaskName(TaskName activeTaskName, int 
replicaNum) {
+    return new TaskName(STANDBY_TASKNAME_PREFIX.concat(TASKNAME_SEPARATOR)
+        .concat(activeTaskName.getTaskName())
+        .concat(TASKNAME_SEPARATOR)
+        .concat(String.valueOf(replicaNum)));
+  }
+
+  // Helper method to get the active task name by stripping the prefix 
"Standby" from the standby task name.
+  public final static TaskName getActiveTaskName(TaskName standbyTaskName) {
+    return new 
TaskName(standbyTaskName.getTaskName().split(TASKNAME_SEPARATOR)[1]);
+  }
+
+  /**
+   *  Given a containerID and job model, it returns the containerids of all 
containers that either have
+   *  a. standby tasks corresponding to active tasks on the given container, or
+   *  b. have active tasks corresponding to standby tasks on the given 
container.
+   *  This is used to ensure that an active task and all its corresponding 
standby tasks are on separate hosts, and
+   *  standby tasks corresponding to the same active task are on separate 
hosts.
+   */
+  public static List<String> getStandbyContainerConstraints(String 
containerID, JobModel jobModel) {
+
+    ContainerModel givenContainerModel = 
jobModel.getContainers().get(containerID);
+    List<String> containerIDsWithStandbyConstraints = new ArrayList<>();
+
+    // iterate over all containerModels in the jobModel
+    for (ContainerModel containerModel : jobModel.getContainers().values()) {
+
+      // add to list if active and standby tasks on the two containerModels 
overlap
+      if (!givenContainerModel.equals(containerModel) && 
checkTaskOverlap(givenContainerModel, containerModel)) {
+        containerIDsWithStandbyConstraints.add(containerModel.getId());
+      }
+    }
+    return containerIDsWithStandbyConstraints;
+  }
+
+  // Helper method that checks if tasks on the two containerModels overlap
+  private static boolean checkTaskOverlap(ContainerModel containerModel1, 
ContainerModel containerModel2) {
+    Set<TaskName> activeTasksOnContainer1 = 
getCorrespondingActiveTasks(containerModel1);
+    Set<TaskName> activeTasksOnContainer2 = 
getCorrespondingActiveTasks(containerModel2);
+    return !Collections.disjoint(activeTasksOnContainer1, 
activeTasksOnContainer2);
+  }
+
+  // Helper method that returns the active tasks corresponding to all standby 
tasks on a container, including any already-active tasks on the container
+  private static Set<TaskName> getCorrespondingActiveTasks(ContainerModel 
containerModel) {
+    Set<TaskName> tasksInActiveMode = getAllTasks(containerModel, 
TaskMode.Active);
+    tasksInActiveMode.addAll(getAllTasks(containerModel, 
TaskMode.Standby).stream()
+        .map(taskName -> getActiveTaskName(taskName))
+        .collect(Collectors.toSet()));
+    return tasksInActiveMode;
+  }
+
+  // Helper method to getAllTaskModels of this container in the given taskMode
+  private static Set<TaskName> getAllTasks(ContainerModel containerModel, 
TaskMode taskMode) {
+    return containerModel.getTasks()
+        .values()
+        .stream()
+        .filter(e -> e.getTaskMode().equals(taskMode))
+        .map(taskModel -> taskModel.getTaskName())
+        .collect(Collectors.toSet());
+  }
+
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
index c7d556a..5fd9ceb 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.samza.clustermanager.StandbyTaskUtil;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskMode;
@@ -60,9 +61,6 @@ import org.slf4j.LoggerFactory;
 public class TaskNameGrouperProxy {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskNameGrouperProxy.class);
-  private static final String CONTAINER_ID_SEPARATOR = "-";
-  private static final String TASKNAME_SEPARATOR = "-";
-  private static final String STANDBY_TASKNAME_PREFIX = "Standby";
   private final TaskNameGrouper taskNameGrouper;
   private final boolean standbyTasksEnabled;
   private final int replicationFactor;
@@ -104,7 +102,7 @@ public class TaskNameGrouperProxy {
 
     for (ContainerModel activeContainer : containerModels) {
       for (int replicaNum = 0; replicaNum < replicationFactor - 1; 
replicaNum++) {
-        String buddyContainerId = getBuddyContainerId(activeContainer.getId(), 
replicaNum);
+        String buddyContainerId = 
StandbyTaskUtil.getStandbyContainerId(activeContainer.getId(), replicaNum);
 
         ContainerModel buddyContainerModel =
             new ContainerModel(buddyContainerId, 
getTaskModelForBuddyContainer(activeContainer.getTasks(), replicaNum));
@@ -124,7 +122,7 @@ public class TaskNameGrouperProxy {
     Map<TaskName, TaskModel> standbyTaskModels = new HashMap<>();
 
     for (TaskName taskName : activeContainerTaskModel.keySet()) {
-      TaskName standbyTaskName = getStandbyTaskName(taskName, replicaNum);
+      TaskName standbyTaskName = StandbyTaskUtil.getStandbyTaskName(taskName, 
replicaNum);
       TaskModel standbyTaskModel =
           new TaskModel(standbyTaskName, 
activeContainerTaskModel.get(taskName).getSystemStreamPartitions(),
               activeContainerTaskModel.get(taskName).getChangelogPartition(), 
TaskMode.Standby);
@@ -135,17 +133,4 @@ public class TaskNameGrouperProxy {
         activeContainerTaskModel);
     return standbyTaskModels;
   }
-
-  // Helper method to generate buddy containerIDs by appending the 
replica-number to the active-container's id.
-  private final static String getBuddyContainerId(String activeContainerId, 
int replicaNumber) {
-    return 
activeContainerId.concat(CONTAINER_ID_SEPARATOR).concat(String.valueOf(replicaNumber));
-  }
-
-  // Helper method to get the standby task name by prefixing "Standby" to the 
corresponding active task's name.
-  private final static TaskName getStandbyTaskName(TaskName activeTaskName, 
int replicaNum) {
-    return new TaskName(STANDBY_TASKNAME_PREFIX.concat(TASKNAME_SEPARATOR)
-        .concat(activeTaskName.getTaskName())
-        .concat(TASKNAME_SEPARATOR)
-        .concat(String.valueOf(replicaNum)));
-  }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 42f7f4b..e662fa1 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 
 import java.util.stream.Collectors;
+import org.apache.samza.clustermanager.StandbyTaskUtil;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
@@ -204,6 +205,7 @@ public class StorageManagerUtil {
 
   /**
    * Creates and returns a File pointing to the directory for the given store 
and task, given a particular base directory.
+   * In case of a standby task (TaskMode.Standby), the storeDirectory is the 
same as it would be for an active task.
    *
    * @param storeBaseDir the base directory to use
    * @param storeName the store name to use
@@ -212,7 +214,10 @@ public class StorageManagerUtil {
    * @return the partition directory for the store
    */
   public static File getStorePartitionDir(File storeBaseDir, String storeName, 
TaskName taskName, TaskMode taskMode) {
-    // TODO: use task-Mode to decide the storePartitionDir -- standby's dir 
should be the same as active
-    return new File(storeBaseDir, (storeName + File.separator + 
taskName.toString()).replace(' ', '_'));
+    TaskName taskNameForDirName = taskName;
+    if (taskMode.equals(TaskMode.Standby)) {
+      taskNameForDirName =  StandbyTaskUtil.getActiveTaskName(taskName);
+    }
+    return new File(storeBaseDir, (storeName + File.separator + 
taskNameForDirName.toString()).replace(' ', '_'));
   }
 }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 15cb18f..265e56f 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -69,6 +69,10 @@ class ContainerProcessManagerMetrics(
           }
       })
 
+     val mFailedStandbyAllocations = newGauge("failed-standby-allocations", () 
=> state.failedStandbyAllocations.get())
+     val mFailoversToAnyHost = newGauge("failovers-to-any-host", () => 
state.failoversToAnyHost.get())
+     val mFailoversToStandby = newGauge("failovers-to-standby", () => 
state.failoversToStandby.get())
+
     jvm.start
     reporters.values.foreach(_.start)
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index 471c7fe..4545a75 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -93,6 +93,11 @@ public class MockClusterResourceManager extends 
ClusterResourceManager {
     launchCountSemaphore.release();
   }
 
+  @Override
+  public void stopStreamProcessor(SamzaResource resource) {
+    // no op
+  }
+
   public void registerContainerListener(MockContainerListener listener) {
     mockContainerListeners.add(listener);
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
index 3aa58b2..676018b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
@@ -70,8 +70,8 @@ public class MockContainerRequestState extends 
ResourceRequestState {
   }
 
   @Override
-  public void releaseUnstartableContainer(SamzaResource container) {
-    super.releaseUnstartableContainer(container);
+  public void releaseUnstartableContainer(SamzaResource container, String 
preferredHost) {
+    super.releaseUnstartableContainer(container, preferredHost);
 
     numReleasedContainers += 1;
     for (MockContainerListener listener : mockContainerListeners) {
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
index ea05fff..1285c5f 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.Optional;
 import org.apache.samza.config.Config;
 
 import java.lang.reflect.Field;
@@ -31,7 +32,7 @@ public class MockHostAwareContainerAllocator extends 
HostAwareContainerAllocator
 
   public MockHostAwareContainerAllocator(ClusterResourceManager manager,
       Config config, SamzaApplicationState state) {
-    super(manager, ALLOCATOR_TIMEOUT_MS, config, state);
+    super(manager, ALLOCATOR_TIMEOUT_MS, config, Optional.empty(), state);
   }
 
   /**
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index d648a80..841e5ba 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -418,7 +418,7 @@ public class TestContainerProcessManager {
   public void testAllBufferedResourcesAreUtilized() throws Exception {
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfigWithHostAffinity());
-    config.put("cluster-manager.container.count", "2");
+    config.put("job.container.count", "2");
     Config cfg = new MapConfig(config);
     // 1. Request two containers on hosts - host1 and host2
     state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host1",
@@ -475,6 +475,7 @@ public class TestContainerProcessManager {
     if (!allocator.awaitContainersStart(2, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
+    taskManager.onStreamProcessorLaunchSuccess(resource2);
     taskManager.onStreamProcessorLaunchSuccess(resource3);
 
     assertTrue(state.jobHealthy.get());
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index 3e5e785..fd3b452 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -21,6 +21,7 @@ package org.apache.samza.clustermanager;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -73,7 +74,7 @@ public class TestHostAwareContainerAllocator {
 
   @Before
   public void setup() throws Exception {
-    containerAllocator = new 
HostAwareContainerAllocator(clusterResourceManager, timeoutMillis, config, 
state);
+    containerAllocator = new 
HostAwareContainerAllocator(clusterResourceManager, timeoutMillis, config, 
Optional.empty(), state);
     requestState = new MockContainerRequestState(clusterResourceManager, true);
     Field requestStateField = 
containerAllocator.getClass().getSuperclass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
new file mode 100644
index 0000000..c075f14
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.Partition;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestStandbyAllocator {
+
+  @Test
+  public void testWithNoStandby() {
+    JobModel jobModel = getJobModelWithStandby(1, 1, 1);
+    List<String> containerConstraints = 
StandbyTaskUtil.getStandbyContainerConstraints("0", jobModel);
+    Assert.assertEquals("Constrained container count should be 0", 0, 
containerConstraints.size());
+  }
+
+  @Test
+  public void testWithStandby() {
+    testWithStandby(2, 1, 2);
+    testWithStandby(10, 1, 2);
+
+    testWithStandby(2, 10, 2);
+    testWithStandby(2, 10, 4);
+
+    testWithStandby(10, 1, 4);
+    testWithStandby(10, 10, 4);
+  }
+
+
+  public void testWithStandby(int nContainers, int nTasks, int 
replicationFactor) {
+    JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, 
replicationFactor);
+
+    for (String containerID : jobModel.getContainers().keySet()) {
+      List<String> containerConstraints = 
StandbyTaskUtil.getStandbyContainerConstraints(containerID, jobModel);
+
+      Assert.assertTrue("Constrained container should be valid containers",
+          jobModel.getContainers().keySet().containsAll(containerConstraints));
+
+      Assert.assertEquals("Constrained container count should be (replication 
factor-1)", replicationFactor - 1,
+          containerConstraints.size());
+
+      Assert.assertFalse("Constrained containers list should not have the 
container itself",
+          containerConstraints.contains(containerID));
+
+      containerConstraints.forEach(containerConstraintID -> {
+          Assert.assertTrue("Constrained containers IDs should correspond to 
the active container",
+              
containerID.split("-")[0].equals(containerConstraintID.split("-")[0]));
+        });
+    }
+  }
+
+  // Helper method to create a jobmodel with given number of containers, tasks 
and replication factor
+  private JobModel getJobModelWithStandby(int nContainers, int nTasks, int 
replicationFactor) {
+    Map<String, ContainerModel> containerModels = new HashMap<>();
+    int taskID = 0;
+
+    for (int j = 0; j < nContainers; j++) {
+      Map<TaskName, TaskModel> tasks = new HashMap<>();
+      for (int i = 0; i < nTasks; i++) {
+        TaskModel taskModel = getTaskModel(taskID++);
+        tasks.put(taskModel.getTaskName(), taskModel);
+      }
+      containerModels.put(String.valueOf(j), new 
ContainerModel(String.valueOf(j), tasks));
+    }
+
+    Map<String, ContainerModel> standbyContainerModels = new HashMap<>();
+    for (int i = 0; i < replicationFactor - 1; i++) {
+      for (String containerID : containerModels.keySet()) {
+        String standbyContainerId = 
StandbyTaskUtil.getStandbyContainerId(containerID, i);
+        Map<TaskName, TaskModel> standbyTasks = 
getStandbyTasks(containerModels.get(containerID).getTasks(), i);
+        standbyContainerModels.put(standbyContainerId, new 
ContainerModel(standbyContainerId, standbyTasks));
+      }
+    }
+
+    containerModels.putAll(standbyContainerModels);
+    return new JobModel(new MapConfig(), containerModels);
+  }
+
+  // Helper method that creates a taskmodel with one input ssp
+  private static TaskModel getTaskModel(int partitionNum) {
+    return new TaskModel(new TaskName("Partition " + partitionNum),
+        Collections.singleton(new SystemStreamPartition("test-system", 
"test-stream", new Partition(partitionNum))),
+        new Partition(partitionNum), TaskMode.Active);
+  }
+
+  // Helper method to create standby-taskModels from active-taskModels
+  private Map<TaskName, TaskModel> getStandbyTasks(Map<TaskName, TaskModel> 
tasks, int replicaNum) {
+    Map<TaskName, TaskModel> standbyTasks = new HashMap<>();
+    tasks.forEach((taskName, taskModel) -> {
+        TaskName standbyTaskName = 
StandbyTaskUtil.getStandbyTaskName(taskName, replicaNum);
+        standbyTasks.put(standbyTaskName,
+            new TaskModel(standbyTaskName, 
taskModel.getSystemStreamPartitions(), taskModel.getChangelogPartition(),
+                TaskMode.Standby));
+      });
+    return standbyTasks;
+  }
+}
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 53b61d9..6d0fdb1 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -227,7 +227,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
    */
   @Override
   public void requestResources(SamzaResourceRequest resourceRequest) {
-    log.info("Requesting resources on  " + resourceRequest.getPreferredHost() 
+ " for container " + resourceRequest.getContainerID());
+    log.info("Requesting resources on " + resourceRequest.getPreferredHost() + 
" for container " + resourceRequest.getContainerID());
 
     int memoryMb = resourceRequest.getMemoryMB();
     int cpuCores = resourceRequest.getNumCores();
@@ -314,6 +314,14 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     }
   }
 
+  public void stopStreamProcessor(SamzaResource resource) {
+    synchronized (lock) {
+      log.info("Stopping resource {}", resource);
+      
this.nmClientAsync.stopContainerAsync(allocatedResources.get(resource).getId(),
+          allocatedResources.get(resource).getNodeId());
+    }
+  }
+
   /**
    * Given a lookupContainerId from Yarn (for example: containerId_app_12345, 
this method returns the SamzaContainer ID
    * in the range [0,N-1] that maps to it.
@@ -520,7 +528,8 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
 
   @Override
   public void onContainerStopped(ContainerId containerId) {
-    log.info("Got a notification from the NodeManager for a stopped container. 
ContainerId: {}", containerId);
+    log.info("Got a notification from the NodeManager for a stopped container. 
ContainerId: {} samzaContainerId {}",
+        containerId, getIDForContainer(containerId.toString()));
   }
 
   @Override
@@ -549,6 +558,19 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
   @Override
   public void onStopContainerError(ContainerId containerId, Throwable t) {
     log.info("Got an error when stopping container from the NodeManager. 
ContainerId: {}. Error: {}", containerId, t);
+    String samzaContainerId = getIDForContainer(containerId.toString());
+
+    if (samzaContainerId != null) {
+      YarnContainer container = 
state.runningYarnContainers.get(samzaContainerId);
+      log.info("Failed Stop on Yarn Container: {} had Samza ContainerId: {} ", 
containerId.toString(), samzaContainerId);
+      SamzaResource resource = new 
SamzaResource(container.resource().getVirtualCores(),
+          container.resource().getMemory(), container.nodeId().getHost(), 
containerId.toString());
+
+      log.info("Re-invoking stop stream processor for container: {}", 
containerId);
+      this.stopStreamProcessor(resource);// For now, we retry the stopping of 
the container
+    } else {
+      log.info("Got an invalid notification for container: {}", 
containerId.toString());
+    }
   }
 
   /**

Reply via email to