http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
new file mode 100644
index 0000000..3d1560f
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+/**
+ * Specification of a Request for resources from a ClusterResourceManager. A
+ * resource request currently includes cpu cores and memory in MB. A preferred 
host
+ * can also be specified with a request.
+ *
+ * When used with a ordered data structures (for example, priority queues)
+ * ordering between two SamzaResourceRequests is defined by their timestamp.
+ *
+ * //TODO: Define a SamzaResourceRequestBuilder API as specified in SAMZA-881
+ */
+public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
+  private static final Logger log = 
LoggerFactory.getLogger(SamzaResourceRequest.class);
+
+  /**
+   * Specifications of a resource request.
+   */
+  private final int numCores;
+
+  private final int memoryMB;
+  /**
+   * The preferred host on which the resource must be allocated. Can be set to
+   * ContainerRequestState.ANY_HOST if there are no host preferences
+   */
+  private final String preferredHost;
+  /**
+   * A request is identified by an unique identifier.
+   */
+  private final String requestID;
+  /**
+   * The ID of the StreamProcessor which this request is for.
+   */
+  private final int containerID;
+
+  /**
+   * The timestamp in millis when the request was created.
+   */
+  private final long requestTimestampMs;
+
+  public SamzaResourceRequest(int numCores, int memoryMB, String 
preferredHost, int expectedContainerID) {
+    this.numCores = numCores;
+    this.memoryMB = memoryMB;
+    this.preferredHost = preferredHost;
+    this.requestID = UUID.randomUUID().toString();
+    this.containerID = expectedContainerID;
+    this.requestTimestampMs = System.currentTimeMillis();
+    log.info("Resource Request created for {} on {} at {}", new Object[] 
{this.containerID, this.preferredHost, this.requestTimestampMs});
+  }
+
+  public int getContainerID() {
+    return containerID;
+  }
+
+  public long getRequestTimestampMs() {
+    return requestTimestampMs;
+  }
+
+  public String getRequestID() {
+    return requestID;
+  }
+
+  public int getNumCores() {
+    return numCores;
+  }
+
+  public String getPreferredHost() {
+    return preferredHost;
+  }
+
+  public int getMemoryMB() {
+    return memoryMB;
+  }
+
+  @Override
+  public String toString() {
+    return "SamzaResourceRequest{" +
+            "numCores=" + numCores +
+            ", memoryMB=" + memoryMB +
+            ", preferredHost='" + preferredHost + '\'' +
+            ", requestID='" + requestID + '\'' +
+            ", containerID=" + containerID +
+            ", requestTimestampMs=" + requestTimestampMs +
+            '}';
+  }
+
+  /**
+   * Requests are ordered by the time at which they were created.
+   * @param o the other
+   */
+  @Override
+  public int compareTo(SamzaResourceRequest o) {
+    if (this.requestTimestampMs < o.requestTimestampMs)
+      return -1;
+    if (this.requestTimestampMs > o.requestTimestampMs)
+      return 1;
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java
new file mode 100644
index 0000000..7e16ce1
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+/**
+ * <p><code>SamzaResourceStatus</code> represents the current status of a
+ * <code>StreamProcessor</code> and the resource it is on.</p>
+ *
+ * <p>It provides details such as:
+ *   <ul>
+ *     <li><code>resourceID</code> of the resource.</li>
+ *     <li><em>Exit status</em> of the StreamProcessor.</li>
+ *     <li><em>Diagnostic</em> message for a failed/pre-empted 
StreamProcessor.</li>
+ *   </ul>
+ *
+ *
+ * The exact semantics of various exit codes and failure modes are evolving.
+ * Currently the following failures are handled -  termination of a process 
running in the resource,
+ * resource preemption, disk failures on host.
+ *
+ */
+public final class SamzaResourceStatus {
+  /**
+   * Indicates that the StreamProcessor on the resource successfully completed.
+   */
+  public static final int SUCCESS = 0;
+  /**
+   * Indicates the failure of the StreamProcessor running on the resource.
+   */
+  public static final int ABORTED = -100;
+  /**
+   * Indicates that the resource was preempted (given to another processor) by
+   * the cluster manager
+   */
+  public static final int PREEMPTED = -102;
+  /**
+   * Indicates a disk failure in the host the resource is on.
+   * Currently these are modelled after Yarn, could evolve as we add 
integrations with
+   * many cluster managers.
+   */
+  public static final int DISK_FAIL = -101;
+
+  private final String resourceID;
+  private final String diagnostics;
+  private final int exitCode;
+
+
+  public SamzaResourceStatus(String resourceID, String diagnostics, int 
exitCode) {
+    this.resourceID = resourceID;
+    this.diagnostics = diagnostics;
+    this.exitCode = exitCode;
+  }
+
+  public int getExitCode() {
+    return exitCode;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public String getResourceID() {
+    return resourceID;
+  }
+
+  @Override
+  public String toString() {
+    return "SamzaResourceStatus{" +
+            "resourceID='" + resourceID + '\'' +
+            ", diagnostics='" + diagnostics + '\'' +
+            ", exitCode=" + exitCode +
+            '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
new file mode 100644
index 0000000..dafd7a7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.samza.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Configs when Samza is used with a ClusterManager like Yarn or Mesos. Some 
of these configs were originally defined
+ * in the yarn namespace. These will be moved to the "cluster-manager" 
namespace. For now, both configs will be honored
+ * with the cluster-manager.* configs taking precedence. There will be a 
deprecated config warning when old configs are used.
+ * Later, we'll enforce the new configs.
+ */
+public class ClusterManagerConfig extends MapConfig {
+
+  private static final Logger log = 
LoggerFactory.getLogger(ClusterManagerConfig.class);
+
+
+  private static final String CLUSTER_MANAGER_FACTORY = 
"samza.cluster-manager.factory";
+  private static final String CLUSTER_MANAGER_FACTORY_DEFAULT = 
"org.apache.samza.job.yarn.YarnResourceManagerFactory";
+
+  /**
+   * Sleep interval for the allocator thread in milliseconds
+   */
+  private static final String ALLOCATOR_SLEEP_MS = 
"cluster-manager.allocator.sleep.ms";
+  public static final String YARN_ALLOCATOR_SLEEP_MS = 
"yarn.allocator.sleep.ms";
+  private static final int DEFAULT_ALLOCATOR_SLEEP_MS = 3600;
+
+  /**
+   * Number of milliseconds before a container request is considered to have 
to expired
+   */
+  public static final String CONTAINER_REQUEST_TIMEOUT_MS = 
"yarn.container.request.timeout.ms";
+  public static final String CLUSTER_MANAGER_REQUEST_TIMEOUT_MS = 
"cluster-manager.container.request.timeout.ms";
+  private static final int DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS = 5000;
+
+  /**
+   * Flag to indicate if host-affinity is enabled for the job or not
+   */
+  public static final String HOST_AFFINITY_ENABLED = 
"yarn.samza.host-affinity.enabled";
+  public static final String CLUSTER_MANAGER_HOST_AFFINITY_ENABLED = 
"yarn.samza.host-affinity.enabled";
+  private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false;
+
+  /**
+   * Number of CPU cores to request from the cluster manager per container
+   */
+  public static final String CONTAINER_MAX_CPU_CORES = 
"yarn.container.cpu.cores";
+  public static final String CLUSTER_MANAGER_MAX_CORES = 
"cluster-manager.cpu.cores";
+  private static final int DEFAULT_CPU_CORES = 1;
+
+  /**
+   * Memory, in megabytes, to request from the cluster manager per container
+   */
+  public static final String CONTAINER_MAX_MEMORY_MB = 
"yarn.container.memory.mb";
+  public static final String CLUSTER_MANAGER_MEMORY_MB = 
"cluster-manager.container.memory.mb";
+  private static final int DEFAULT_CONTAINER_MEM = 1024;
+
+  /**
+   * Determines how frequently a container is allowed to fail before we give 
up and fail the job
+   */
+  public static final String CONTAINER_RETRY_WINDOW_MS = 
"yarn.container.retry.window.ms";
+  public static final String CLUSTER_MANAGER_RETRY_WINDOW_MS = 
"cluster-manager.container.retry.window.ms";
+  private static final int DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000;
+
+  /**
+   * Maximum number of times Samza tries to restart a failed container
+   */
+  public static final String CONTAINER_RETRY_COUNT = 
"yarn.container.retry.count";
+  public static final String CLUSTER_MANAGER_CONTAINER_RETRY_COUNT = 
"cluster-manager.container.retry.count";
+  private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
+
+  /**
+   * Determines whether a JMX server should be started on the job coordinator
+   * Default: true
+   */
+  public static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled";
+  public static final String CLUSTER_MANAGER_JMX_ENABLED = 
"cluster-manager.jobcoordinator.jmx.enabled";
+
+  /**
+   * The cluster managed job coordinator sleeps for a configurable time before 
checking again for termination.
+   * The sleep interval of the cluster managed job coordinator.
+   */
+  public static final String CLUSTER_MANAGER_SLEEP_MS = 
"cluster-manager.jobcoordinator.sleep.interval.ms";
+  private static final int DEFAULT_CLUSTER_MANAGER_SLEEP_MS = 1000;
+
+  public ClusterManagerConfig(Config config) {
+      super(config);
+  }
+
+  public int getAllocatorSleepTime() {
+    if (containsKey(ALLOCATOR_SLEEP_MS)) {
+      return getInt(ALLOCATOR_SLEEP_MS);
+    } else if (containsKey(YARN_ALLOCATOR_SLEEP_MS)) {
+      log.info("Configuration {} is deprecated. Please use {}", 
YARN_ALLOCATOR_SLEEP_MS, ALLOCATOR_SLEEP_MS);
+      return getInt(YARN_ALLOCATOR_SLEEP_MS);
+    } else {
+      return DEFAULT_ALLOCATOR_SLEEP_MS;
+    }
+  }
+
+  public int getNumCores() {
+    if (containsKey(CLUSTER_MANAGER_MAX_CORES)) {
+      return getInt(CLUSTER_MANAGER_MAX_CORES);
+    } else if (containsKey(CONTAINER_MAX_CPU_CORES)) {
+      log.info("Configuration {} is deprecated. Please use {}", 
CONTAINER_MAX_CPU_CORES, CLUSTER_MANAGER_MAX_CORES);
+      return getInt(CONTAINER_MAX_CPU_CORES);
+    } else {
+      return DEFAULT_CPU_CORES;
+    }
+  }
+
+  public int getContainerMemoryMb() {
+    if (containsKey(CLUSTER_MANAGER_MEMORY_MB)) {
+      return getInt(CLUSTER_MANAGER_MEMORY_MB);
+    } else if (containsKey(CONTAINER_MAX_MEMORY_MB)) {
+      log.info("Configuration {} is deprecated. Please use {}", 
CONTAINER_MAX_MEMORY_MB, CLUSTER_MANAGER_MEMORY_MB);
+      return getInt(CONTAINER_MAX_MEMORY_MB);
+    } else {
+      return DEFAULT_CONTAINER_MEM;
+    }
+  }
+
+  public boolean getHostAffinityEnabled() {
+    if (containsKey(CLUSTER_MANAGER_HOST_AFFINITY_ENABLED)) {
+      return getBoolean(CLUSTER_MANAGER_HOST_AFFINITY_ENABLED);
+    } else if (containsKey(HOST_AFFINITY_ENABLED)) {
+      log.info("Configuration {} is deprecated. Please use {}", 
HOST_AFFINITY_ENABLED, CLUSTER_MANAGER_HOST_AFFINITY_ENABLED);
+      return getBoolean(HOST_AFFINITY_ENABLED);
+    } else {
+      return false;
+    }
+  }
+
+  public int getContainerRequestTimeout() {
+    if (containsKey(CLUSTER_MANAGER_REQUEST_TIMEOUT_MS)) {
+      return getInt(CLUSTER_MANAGER_REQUEST_TIMEOUT_MS);
+    } else if (containsKey(CONTAINER_REQUEST_TIMEOUT_MS)) {
+      log.info("Configuration {} is deprecated. Please use {}", 
CONTAINER_REQUEST_TIMEOUT_MS, CLUSTER_MANAGER_REQUEST_TIMEOUT_MS);
+      return getInt(CONTAINER_REQUEST_TIMEOUT_MS);
+    } else {
+      return DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS;
+    }
+  }
+
+  public int getContainerRetryCount() {
+    if (containsKey(CLUSTER_MANAGER_CONTAINER_RETRY_COUNT))
+      return getInt(CLUSTER_MANAGER_CONTAINER_RETRY_COUNT);
+    else if (containsKey(CONTAINER_RETRY_COUNT)) {
+      log.info("Configuration {} is deprecated. Please use {}", 
CONTAINER_RETRY_COUNT, CLUSTER_MANAGER_CONTAINER_RETRY_COUNT);
+      return getInt(CONTAINER_RETRY_COUNT);
+    } else {
+      return DEFAULT_CONTAINER_RETRY_COUNT;
+    }
+  }
+
+  public int getContainerRetryWindowMs() {
+    if (containsKey(CLUSTER_MANAGER_RETRY_WINDOW_MS)) {
+      return getInt(CLUSTER_MANAGER_RETRY_WINDOW_MS);
+    } else if (containsKey(CONTAINER_RETRY_WINDOW_MS)) {
+      log.info("Configuration {} is deprecated. Please use {}", 
CONTAINER_RETRY_WINDOW_MS, CLUSTER_MANAGER_RETRY_WINDOW_MS);
+      return getInt(CONTAINER_RETRY_WINDOW_MS);
+    } else {
+      return DEFAULT_CONTAINER_RETRY_WINDOW_MS;
+    }
+  }
+
+
+  public int getJobCoordinatorSleepInterval() {
+    return getInt(CLUSTER_MANAGER_SLEEP_MS, DEFAULT_CLUSTER_MANAGER_SLEEP_MS);
+  }
+
+  public String getContainerManagerClass() {
+    return get(CLUSTER_MANAGER_FACTORY, CLUSTER_MANAGER_FACTORY_DEFAULT);
+  }
+
+  public boolean getJmxEnabled() {
+    if (containsKey(CLUSTER_MANAGER_JMX_ENABLED)) {
+      return getBoolean(CLUSTER_MANAGER_JMX_ENABLED);
+    } else if (containsKey(AM_JMX_ENABLED)) {
+      log.info("Configuration {} is deprecated. Please use {}", 
AM_JMX_ENABLED, CLUSTER_MANAGER_JMX_ENABLED);
+      return getBoolean(AM_JMX_ENABLED);
+    } else {
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index a3281c2..a615d4f 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -88,7 +88,7 @@ public class LocalityManager extends 
AbstractCoordinatorStreamManager {
 
   /**
    * Method to allow read container locality information from coordinator 
stream. This method is used
-   * in {@link org.apache.samza.coordinator.JobCoordinator}.
+   * in {@link org.apache.samza.coordinator.JobModelManager}.
    *
    * @return the map of containerId: (hostname, jmxAddress, jmxTunnelAddress)
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 6473dfb..11207b2 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -66,7 +66,7 @@ public class TaskAssignmentManager extends 
AbstractCoordinatorStreamManager {
 
   /**
    * Method to allow read container task information from coordinator stream. 
This method is used
-   * in {@link org.apache.samza.coordinator.JobCoordinator}.
+   * in {@link org.apache.samza.coordinator.JobModelManager}.
    *
    * @return the map of taskName: containerId
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 0324e90..9329edf 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -31,7 +31,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaStorageConfig;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
@@ -117,7 +117,7 @@ public class StorageRecovery extends CommandLine {
    * map
    */
   private void getContainerModels() {
-    JobModel jobModel = JobCoordinator.apply(jobConfig).jobModel();
+    JobModel jobModel = JobModelManager.apply(jobConfig).jobModel();
     containers = jobModel.getContainers();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 31b208f..91de18d 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -32,7 +32,7 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.{Util, CommandLine, Logging}
 import org.apache.samza.{Partition, SamzaException}
 import scala.collection.JavaConversions._
-import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.JobModelManager
 
 import scala.collection.immutable.HashMap
 
@@ -141,8 +141,8 @@ class CheckpointTool(config: Config, newOffsets: 
TaskNameToCheckpointMap, manage
     info("Using %s" format manager)
 
     // Find all the TaskNames that would be generated for this job config
-    val coordinator = JobCoordinator(config)
-    val taskNames = coordinator
+    val jobModelManager = JobModelManager(config)
+    val taskNames = jobModelManager
       .jobModel
       .getContainers
       .values

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index bd7f3f5..e9a5108 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -45,12 +45,12 @@ import scala.collection.JavaConversions._
  * Helper companion object that is responsible for wiring up a JobCoordinator
  * given a Config object.
  */
-object JobCoordinator extends Logging {
+object JobModelManager extends Logging {
 
   /**
    * a volatile value to store the current instantiated 
<code>JobCoordinator</code>
    */
-  @volatile var currentJobCoordinator: JobCoordinator = null
+  @volatile var currentJobModelManager: JobModelManager = null
   val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]()
 
   /**
@@ -59,7 +59,7 @@ object JobCoordinator extends Logging {
    * configuration. The method will use this config to read all configuration
    * from the coordinator stream, and instantiate a JobCoordinator.
    */
-  def apply(coordinatorSystemConfig: Config, metricsRegistryMap: 
MetricsRegistryMap): JobCoordinator = {
+  def apply(coordinatorSystemConfig: Config, metricsRegistryMap: 
MetricsRegistryMap): JobModelManager = {
     val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new 
CoordinatorStreamSystemFactory()
     val coordinatorSystemConsumer = 
coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig,
 metricsRegistryMap)
     val coordinatorSystemProducer = 
coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig,
 metricsRegistryMap)
@@ -107,7 +107,7 @@ object JobCoordinator extends Logging {
     jobCoordinator
   }
 
-  def apply(coordinatorSystemConfig: Config): JobCoordinator = 
apply(coordinatorSystemConfig, new MetricsRegistryMap())
+  def apply(coordinatorSystemConfig: Config): JobModelManager = 
apply(coordinatorSystemConfig, new MetricsRegistryMap())
 
   /**
    * Build a JobCoordinator using a Samza job's configuration.
@@ -122,8 +122,8 @@ object JobCoordinator extends Logging {
 
     val server = new HttpServer
     server.addServlet("/*", new JobServlet(jobModelRef))
-    currentJobCoordinator = new JobCoordinator(jobModel, server, 
streamPartitionCountMonitor)
-    currentJobCoordinator
+    currentJobModelManager = new JobModelManager(jobModel, server, 
streamPartitionCountMonitor)
+    currentJobModelManager
   }
 
   /**
@@ -299,7 +299,7 @@ object JobCoordinator extends Logging {
  * coordinator's responsibility is simply to propagate the job model, and HTTP
  * server right now.</p>
  */
-class JobCoordinator(
+class JobModelManager(
   /**
    * The data model that describes the Samza job's containers and tasks.
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
index 95d01dd..f5c8a46 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
@@ -23,7 +23,7 @@ import java.io.{InputStream, OutputStream}
 import java.util.concurrent.CountDownLatch
 
 import org.apache.samza.SamzaException
-import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.job.ApplicationStatus.{New, Running, 
UnsuccessfulFinish}
 import org.apache.samza.job.util.ProcessKiller
 import org.apache.samza.job.{ApplicationStatus, CommandBuilder, StreamJob}
@@ -31,7 +31,7 @@ import org.apache.samza.util.Logging
 
 import scala.collection.JavaConversions._
 
-class ProcessJob(commandBuilder: CommandBuilder, jobCoordinator: 
JobCoordinator) extends StreamJob with Logging {
+class ProcessJob(commandBuilder: CommandBuilder, jobCoordinator: 
JobModelManager) extends StreamJob with Logging {
   var jobStatus: Option[ApplicationStatus] = None
   var process: Process = null
 

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 81ef59a..475df52 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -25,7 +25,7 @@ import java.io.File
 import org.apache.samza.SamzaException
 import org.apache.samza.config.{JobConfig, Config}
 import org.apache.samza.config.TaskConfig._
-import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, 
StreamJobFactory}
 import org.apache.samza.util.{Logging, Util}
 
@@ -40,7 +40,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging 
{
       throw new SamzaException("Container count larger than 1 is not supported 
for ProcessJobFactory")
     }
     
-    val coordinator = JobCoordinator(config)
+    val coordinator = JobModelManager(config)
     val containerModel = coordinator.jobModel.getContainers.get(0)
 
     val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is 
configured

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 5acfe87..56881d4 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -29,7 +29,7 @@ import org.apache.samza.config.TaskConfig._
 import org.apache.samza.container.SamzaContainer
 import org.apache.samza.job.{ StreamJob, StreamJobFactory }
 import org.apache.samza.config.JobConfig._
-import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.JobModelManager
 
 /**
  * Creates a new Thread job with the given config
@@ -37,7 +37,7 @@ import org.apache.samza.coordinator.JobCoordinator
 class ThreadJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config): StreamJob = {
     info("Creating a ThreadJob, which is only meant for debugging.")
-    val coordinator = JobCoordinator(config)
+    val coordinator = JobModelManager(config)
     val containerModel = coordinator.jobModel.getContainers.get(0)
 
     // Give developers a nice friendly warning if they've specified task.opts 
and are using a threaded job.

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
new file mode 100644
index 0000000..86c2440
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+
+import org.apache.samza.SamzaException
+import org.apache.samza.clustermanager.SamzaApplicationState
+import org.apache.samza.config.Config
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.util.{Logging, Util}
+
+object ContainerProcessManagerMetrics {
+  val sourceName = "ApplicationMaster"
+}
+
+/**
+ * Responsible for wiring up Samza's metrics. Given that Samza has a metric
+ * registry, we might as well use it. This class takes Samza's application
+ * master state, and converts it to metrics.
+ */
+class ContainerProcessManagerMetrics(
+                                      val config: Config,
+                                      val state: SamzaApplicationState,
+                                      val registry: ReadableMetricsRegistry) 
extends MetricsHelper  with Logging {
+
+  val jvm = new JvmMetrics(registry)
+  val reporters = config.getMetricReporterNames.map(reporterName => {
+    val metricsFactoryClassName = config
+      .getMetricsFactoryClass(reporterName)
+      .getOrElse(throw new SamzaException("Metrics reporter %s missing .class 
config" format reporterName))
+
+    val reporter =
+      Util
+        .getObj[MetricsReporterFactory](metricsFactoryClassName)
+        .getMetricsReporter(reporterName, 
ContainerProcessManagerMetrics.sourceName, config)
+
+    reporter.register(ContainerProcessManagerMetrics.sourceName, registry)
+    (reporterName, reporter)
+  }).toMap
+
+   def start() {
+    val mRunningContainers = newGauge("running-containers", () => 
state.runningContainers.size)
+    val mNeededContainers = newGauge("needed-containers", () => 
state.neededResources.get())
+    val mCompletedContainers = newGauge("completed-containers", () => 
state.completedContainers.get())
+    val mFailedContainers = newGauge("failed-containers", () => 
state.failedContainers.get())
+    val mReleasedContainers = newGauge("released-containers", () => 
state.releasedContainers.get())
+    val mContainers = newGauge("container-count", () => state.containerCount)
+
+    val mJobHealthy = newGauge("job-healthy", () => if 
(state.jobHealthy.get()) 1 else 0)
+    val mLocalityMatchedRequests = newGauge(
+      "locality-matched",
+      () => {
+        if (state.containerRequests.get() != 0) {
+          state.matchedResourceRequests.get() / state.containerRequests.get()
+        } else {
+          0L
+        }
+      })
+
+    jvm.start
+    reporters.values.foreach(_.start)
+  }
+
+   def stop() {
+    reporters.values.foreach(_.stop)
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
new file mode 100644
index 0000000..0d13fb1
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.job.CommandBuilder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public class MockClusterResourceManager extends ClusterResourceManager {
+  Set<SamzaResource> releasedResources = new HashSet<>();
+  List<SamzaResource> resourceRequests = new ArrayList<>();
+  List<SamzaResourceRequest> cancelledRequests = new ArrayList<>();
+  List<SamzaResource> launchedResources = new ArrayList<>();
+  List<MockContainerListener> mockContainerListeners = new 
ArrayList<MockContainerListener>();
+  Throwable nextException = null;
+
+  public MockClusterResourceManager(ClusterResourceManager.Callback callback) {
+    super(callback);
+  }
+
+  @Override
+  public void start() {
+
+  }
+
+  @Override
+  public void requestResources(SamzaResourceRequest resourceRequest) {
+    SamzaResource resource = new SamzaResource(resourceRequest.getNumCores(), 
resourceRequest.getMemoryMB(), resourceRequest.getPreferredHost(), 
UUID.randomUUID().toString());
+    List<SamzaResource> resources = Collections.singletonList(resource);
+    resourceRequests.addAll(resources);
+  }
+
+  @Override
+  public void cancelResourceRequest(SamzaResourceRequest request) {
+    cancelledRequests.add(request);
+  }
+
+  @Override
+  public void releaseResources(SamzaResource resource) {
+    releasedResources.add(resource);
+  }
+
+  @Override
+  public void launchStreamProcessor(SamzaResource resource, CommandBuilder 
builder) throws SamzaContainerLaunchException {
+    if (nextException != null) {
+      throw new SamzaContainerLaunchException(nextException);
+    }
+    launchedResources.add(resource);
+    for (MockContainerListener listener : mockContainerListeners) {
+      listener.postRunContainer(launchedResources.size());
+    }
+
+  }
+
+  @Override
+  public void stop(SamzaApplicationState.SamzaAppStatus status) {
+
+  }
+
+  public void registerContainerListener(MockContainerListener listener) {
+    mockContainerListeners.add(listener);
+  }
+
+  public void clearContainerListeners() {
+    mockContainerListeners.clear();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
new file mode 100644
index 0000000..5079625
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MockClusterResourceManagerCallback implements 
ClusterResourceManager.Callback {
+  List<SamzaResource> resources = new ArrayList<>();
+  List<SamzaResourceStatus> resourceStatuses = new ArrayList<>();
+  Throwable error;
+
+  @Override
+  public void onResourcesAvailable(List<SamzaResource> resourceList) {
+    resources.addAll(resourceList);
+  }
+
+  @Override
+  public void onResourcesCompleted(List<SamzaResourceStatus> 
resourceStatusList) {
+    resourceStatuses.addAll(resourceStatusList);
+  }
+
+  @Override
+  public void onError(Throwable e) {
+    error = e;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
new file mode 100644
index 0000000..6189fe7
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.config.Config;
+
+import java.lang.reflect.Field;
+
+import java.util.Map;
+
+public class MockContainerAllocator extends ContainerAllocator {
+  public int requestedContainers = 0;
+
+  public MockContainerAllocator(ClusterResourceManager manager,
+                                Config config,
+                                SamzaApplicationState state) {
+    super(manager, config, state);
+  }
+
+  @Override
+  public void requestResources(Map<Integer, String> containerToHostMappings) {
+    requestedContainers += containerToHostMappings.size();
+    super.requestResources(containerToHostMappings);
+  }
+
+  public ResourceRequestState getContainerRequestState() throws Exception {
+    Field field = 
AbstractContainerAllocator.class.getDeclaredField("resourceRequestState");
+    field.setAccessible(true);
+
+    return (ResourceRequestState) field.get(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
new file mode 100644
index 0000000..db70c38
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+public class MockContainerListener {
+  private final CountDownLatch conditionLatch;
+
+
+  private final AsyncCountableCondition containersAdded;
+  private final AsyncCountableCondition containersReleased;
+  private final AsyncCountableCondition containersAssigned;
+  private final AsyncCountableCondition containersRunning;
+
+  private final AsyncCountableCondition[] allConditions;
+
+  public MockContainerListener(int numExpectedContainersAdded,
+                               int numExpectedContainersReleased,
+                               int numExpectedContainersAssigned,
+                               int numExpectedContainersRunning,
+                               Runnable addContainerAssertions,
+                               Runnable releaseContainerAssertions,
+                               Runnable assignContainerAssertions,
+                               Runnable runContainerAssertions) {
+    containersAdded = new AsyncCountableCondition("containers added", 
numExpectedContainersAdded, addContainerAssertions);
+    containersReleased = new AsyncCountableCondition("containers released", 
numExpectedContainersReleased, releaseContainerAssertions);
+    containersAssigned = new AsyncCountableCondition("containers assigned", 
numExpectedContainersAssigned, assignContainerAssertions);
+    containersRunning = new AsyncCountableCondition("containers running", 
numExpectedContainersRunning, runContainerAssertions);
+
+    allConditions = new AsyncCountableCondition[] {containersAdded, 
containersReleased, containersAssigned, containersRunning};
+
+    int unsatisfiedConditions = 0;
+    for (AsyncCountableCondition condition : allConditions) {
+      if (!condition.isSatisfied()) {
+        unsatisfiedConditions++;
+      }
+    }
+
+    conditionLatch = new CountDownLatch(unsatisfiedConditions);
+  }
+
+  public void postAddContainer(int totalAddedContainers) {
+    if (containersAdded.update(totalAddedContainers)) {
+      conditionLatch.countDown();
+    }
+  }
+
+  public void postReleaseContainers(int totalReleasedContainers) {
+    if (containersReleased.update(totalReleasedContainers)) {
+      conditionLatch.countDown();
+    }
+  }
+
+  public void postUpdateRequestStateAfterAssignment(int 
totalAssignedContainers) {
+    if (containersAssigned.update(totalAssignedContainers)) {
+      conditionLatch.countDown();
+    }
+  }
+
+  public void postRunContainer(int totalRunningContainers) {
+    if (containersRunning.update(totalRunningContainers)) {
+      conditionLatch.countDown();
+    }
+  }
+
+  /**
+   * This method should be called in the main thread. It waits for all the 
conditions to occur in the other
+   * threads and then verifies that they were in fact satisfied.
+   */
+  public void verify() throws InterruptedException {
+    conditionLatch.await(5, TimeUnit.SECONDS);
+
+    for (AsyncCountableCondition condition : allConditions) {
+      condition.verify();
+    }
+  }
+
+  private static class AsyncCountableCondition {
+    private boolean satisfied = false;
+    private final int expectedCount;
+    private final Runnable postConditionAssertions;
+    private final String name;
+    private AssertionError assertionError = null;
+
+    private AsyncCountableCondition(String name, int expectedCount, Runnable 
postConditionAssertions) {
+      this.name = name;
+      this.expectedCount = expectedCount;
+      if (expectedCount == 0) satisfied = true;
+      this.postConditionAssertions = postConditionAssertions;
+    }
+
+    public boolean update(int latestCount) {
+      if (!satisfied && latestCount == expectedCount) {
+        if (postConditionAssertions != null) {
+          try {
+            postConditionAssertions.run();
+          } catch (Throwable t) {
+            assertionError = new AssertionError(String.format("Assertion for 
'%s' failed", name), t);
+          }
+        }
+
+        satisfied = true;
+        return true;
+      }
+      return false;
+    }
+
+    public boolean isSatisfied() {
+      return satisfied;
+    }
+
+    public void verify() {
+      assertTrue(String.format("Condition '%s' was not satisfied", name), 
isSatisfied());
+
+      if (assertionError != null) {
+        throw assertionError;
+      }
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
new file mode 100644
index 0000000..3aa58b2
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+
+public class MockContainerRequestState extends ResourceRequestState {
+  private final List<MockContainerListener> mockContainerListeners = new 
ArrayList<MockContainerListener>();
+  private int numAddedContainers = 0;
+  private int numReleasedContainers = 0;
+  private int numAssignedContainers = 0;
+  public Queue<SamzaResourceRequest> assignedRequests = new LinkedList<>();
+
+  public MockContainerRequestState(ClusterResourceManager manager,
+                                   boolean hostAffinityEnabled) {
+    super(hostAffinityEnabled, manager);
+  }
+
+  @Override
+  public synchronized void updateStateAfterAssignment(SamzaResourceRequest 
request, String assignedHost, SamzaResource resource) {
+    super.updateStateAfterAssignment(request, assignedHost, resource);
+
+    numAssignedContainers++;
+    assignedRequests.add(request);
+
+    for (MockContainerListener listener : mockContainerListeners) {
+      listener.postUpdateRequestStateAfterAssignment(numAssignedContainers);
+    }
+  }
+
+  @Override
+  public synchronized void addResource(SamzaResource container) {
+    super.addResource(container);
+
+    numAddedContainers++;
+    for (MockContainerListener listener : mockContainerListeners) {
+      listener.postAddContainer(numAddedContainers);
+    }
+  }
+
+  @Override
+  public synchronized int releaseExtraResources() {
+    numReleasedContainers += super.releaseExtraResources();
+
+    for (MockContainerListener listener : mockContainerListeners) {
+      listener.postReleaseContainers(numReleasedContainers);
+    }
+
+    return numAddedContainers;
+  }
+
+  @Override
+  public void releaseUnstartableContainer(SamzaResource container) {
+    super.releaseUnstartableContainer(container);
+
+    numReleasedContainers += 1;
+    for (MockContainerListener listener : mockContainerListeners) {
+      listener.postReleaseContainers(numReleasedContainers);
+    }
+  }
+
+
+  public void registerContainerListener(MockContainerListener listener) {
+    mockContainerListeners.add(listener);
+  }
+
+  public void clearContainerListeners() {
+    mockContainerListeners.clear();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java
new file mode 100644
index 0000000..4f44ced
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class MockHttpServer extends HttpServer {
+
+  public MockHttpServer(String rootPath, int port, String resourceBasePath, 
ServletHolder defaultHolder) {
+    super(rootPath, port, resourceBasePath, defaultHolder);
+    start();
+  }
+
+  @Override
+  public void start() {
+    super.running_$eq(true);
+  }
+
+  @Override
+  public void stop() {
+    super.running_$eq(false);
+  }
+
+  @Override
+  public URL getUrl() {
+    if (running()) {
+      try {
+        return new URL("http://localhost:12345/";);
+      } catch (MalformedURLException mue) {
+        mue.printStackTrace();
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
new file mode 100644
index 0000000..f147570
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestContainerAllocator {
+  private final MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
+  private final MockClusterResourceManager manager = new 
MockClusterResourceManager(callback);
+  private final Config config = getConfig();
+  private final JobModelManager reader = getJobModelReader(1);
+  private final SamzaApplicationState state = new 
SamzaApplicationState(reader);
+  private ContainerAllocator containerAllocator;
+  private MockContainerRequestState requestState;
+  private Thread allocatorThread;
+
+  @Before
+  public void setup() throws Exception {
+    containerAllocator = new ContainerAllocator(manager, config, state);
+    requestState = new MockContainerRequestState(manager, false);
+    Field requestStateField = 
containerAllocator.getClass().getSuperclass().getDeclaredField("resourceRequestState");
+    requestStateField.setAccessible(true);
+    requestStateField.set(containerAllocator, requestState);
+    allocatorThread = new Thread(containerAllocator);
+  }
+
+
+
+  @After
+  public void teardown() throws Exception {
+    reader.stop();
+    containerAllocator.stop();
+  }
+
+
+  private static Config getConfig() {
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        put("yarn.container.count", "1");
+        put("systems.test-system.samza.factory", 
"org.apache.samza.job.yarn.MockSystemFactory");
+        put("yarn.container.memory.mb", "512");
+        put("yarn.package.path", "/foo");
+        put("task.inputs", "test-system.test-stream");
+        put("systems.test-system.samza.key.serde", 
"org.apache.samza.serializers.JsonSerde");
+        put("systems.test-system.samza.msg.serde", 
"org.apache.samza.serializers.JsonSerde");
+        put("yarn.container.retry.count", "1");
+        put("yarn.container.retry.window.ms", "1999999999");
+        put("yarn.allocator.sleep.ms", "10");
+      }
+    });
+
+    Map<String, String> map = new HashMap<>();
+    map.putAll(config);
+    return new MapConfig(map);
+  }
+
+  private static JobModelManager getJobModelReader(int containerCount) {
+    //Ideally, the JobModelReader should be constructed independent of 
HttpServer.
+    //That way it becomes easier to mock objects. Save it for later.
+
+    HttpServer server = new MockHttpServer("/", 7777, null, new 
ServletHolder(DefaultServlet.class));
+    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
+    for (int i = 0; i < containerCount; i++) {
+      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, 
TaskModel>());
+      containers.put(i, container);
+    }
+    JobModel jobModel = new JobModel(getConfig(), containers);
+    return new JobModelManager(jobModel, server, null);
+  }
+
+
+  /**
+   * Adds all containers returned to ANY_HOST only
+   */
+  @Test
+  public void testAddContainer() throws Exception {
+    assertNull(requestState.getResourcesOnAHost("abc"));
+    
assertNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
+
+    containerAllocator.addResource(new SamzaResource(1, 1000, "abc", "id1"));
+    containerAllocator.addResource(new SamzaResource(1, 1000, "xyz", "id1"));
+
+
+    assertNull(requestState.getResourcesOnAHost("abc"));
+    
assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
+    
assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size()
 == 2);
+  }
+
+  /**
+   * Test requestContainers
+   */
+  @Test
+  public void testRequestContainers() throws Exception {
+    Map<Integer, String> containersToHostMapping = new HashMap<Integer, 
String>() {
+      {
+        put(0, "abc");
+        put(1, "def");
+        put(2, null);
+        put(3, "abc");
+      }
+    };
+
+    allocatorThread.start();
+
+    containerAllocator.requestResources(containersToHostMapping);
+
+    assertEquals(4, manager.resourceRequests.size());
+
+    assertNotNull(requestState);
+
+    assertEquals(requestState.numPendingRequests(), 4);
+
+    // If host-affinty is not enabled, it doesn't update the requestMap
+    assertNotNull(requestState.getRequestsToCountMap());
+    assertEquals(requestState.getRequestsToCountMap().keySet().size(), 0);
+  }
+
+  /**
+   * Test request containers with no containerToHostMapping makes the right 
number of requests
+   */
+  @Test
+  public void testRequestContainersWithNoMapping() throws Exception {
+    int containerCount = 4;
+    Map<Integer, String> containersToHostMapping = new HashMap<Integer, 
String>();
+    for (int i = 0; i < containerCount; i++) {
+      containersToHostMapping.put(i, null);
+    }
+    allocatorThread.start();
+
+    containerAllocator.requestResources(containersToHostMapping);
+
+    assertNotNull(requestState);
+
+    assertTrue(requestState.numPendingRequests() == 4);
+
+    // If host-affinty is not enabled, it doesn't update the requestMap
+    assertNotNull(requestState.getRequestsToCountMap());
+    assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0);
+  }
+
+  /**
+   * Extra allocated containers that are returned by the RM and unused by the 
AM should be released.
+   * Containers are considered "extra" only when there are no more pending 
requests to fulfill
+   * @throws Exception
+   */
+  @Test
+  public void testAllocatorReleasesExtraContainers() throws Exception {
+    final SamzaResource resource = new SamzaResource(1, 1000, "abc", "id1");
+    final SamzaResource resource1 = new SamzaResource(1, 1000, "abc", "id2");
+    final SamzaResource resource2 = new SamzaResource(1, 1000, "def", "id3");
+
+
+    // Set up our final asserts before starting the allocator thread
+    MockContainerListener listener = new MockContainerListener(3, 2, 0, 0, 
null, new Runnable() {
+      @Override
+      public void run() {
+
+        assertTrue(manager.releasedResources.contains(resource1));
+        assertTrue(manager.releasedResources.contains(resource2));
+
+        // Test that state is cleaned up
+        assertEquals(0, requestState.numPendingRequests());
+        assertEquals(0, requestState.getRequestsToCountMap().size());
+        assertNull(requestState.getResourcesOnAHost("abc"));
+        assertNull(requestState.getResourcesOnAHost("def"));
+      }
+    }, null, null);
+    requestState.registerContainerListener(listener);
+
+    allocatorThread.start();
+
+    containerAllocator.requestResource(0, "abc");
+
+    containerAllocator.addResource(resource);
+    containerAllocator.addResource(resource1);
+    containerAllocator.addResource(resource2);
+
+    listener.verify();
+  }
+
+
+  /**
+   * If the container fails to start e.g because it fails to connect to a NM 
on a host that
+   * is down, the allocator should request a new container on a different host.
+   */
+  @Test
+  public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
+    final SamzaResource container = new SamzaResource(1, 1024, "2", "id0");
+    final SamzaResource container1 = new SamzaResource(1, 1024, "2", "id0");
+    manager.nextException = new IOException("Cant connect to RM");
+
+    // Set up our final asserts before starting the allocator thread
+    MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, 
null, new Runnable() {
+      @Override
+      public void run() {
+        // The failed container should be released. The successful one should 
not.
+        assertNotNull(manager.releasedResources);
+        assertEquals(1, manager.releasedResources.size());
+        assertTrue(manager.releasedResources.contains(container));
+      }
+    },
+        new Runnable() {
+          @Override
+          public void run() {
+            // Test that the first request assignment had a preferred host and 
the retry didn't
+            assertEquals(2, requestState.assignedRequests.size());
+
+            SamzaResourceRequest request = 
requestState.assignedRequests.remove();
+            assertEquals(0, request.getContainerID());
+            assertEquals("2", request.getPreferredHost());
+
+            request = requestState.assignedRequests.remove();
+            assertEquals(0, request.getContainerID());
+            assertEquals("ANY_HOST", request.getPreferredHost());
+
+            // This routine should be called after the retry is assigned, but 
before it's started.
+            // So there should still be 1 container needed.
+            assertEquals(1, state.neededResources.get());
+          }
+        }, null
+    );
+    state.neededResources.set(1);
+    requestState.registerContainerListener(listener);
+
+    containerAllocator.requestResource(0, "2");
+    containerAllocator.addResource(container);
+    containerAllocator.addResource(container1);
+    allocatorThread.start();
+
+    listener.verify();
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
new file mode 100644
index 0000000..4fd1018
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestContainerProcessManager {
+  private final MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
+  private final MockClusterResourceManager manager = new 
MockClusterResourceManager(callback);
+
+  private static volatile boolean isRunning = false;
+
+  private Map<String, String> configVals = new HashMap<String, String>()  {
+    {
+      put("yarn.container.count", "1");
+      put("systems.test-system.samza.factory", 
"org.apache.samza.job.yarn.MockSystemFactory");
+      put("yarn.container.memory.mb", "512");
+      put("yarn.package.path", "/foo");
+      put("task.inputs", "test-system.test-stream");
+      put("systems.test-system.samza.key.serde", 
"org.apache.samza.serializers.JsonSerde");
+      put("systems.test-system.samza.msg.serde", 
"org.apache.samza.serializers.JsonSerde");
+      put("yarn.container.retry.count", "1");
+      put("yarn.container.retry.window.ms", "1999999999");
+      put("yarn.allocator.sleep.ms", "1");
+      put("yarn.container.request.timeout.ms", "2");
+    }
+  };
+  private Config config = new MapConfig(configVals);
+
+  private Config getConfig() {
+    Map<String, String> map = new HashMap<>();
+    map.putAll(config);
+    return new MapConfig(map);
+  }
+
+  private Config getConfigWithHostAffinity() {
+    Map<String, String> map = new HashMap<>();
+    map.putAll(config);
+    map.put("yarn.samza.host-affinity.enabled", "true");
+    return new MapConfig(map);
+  }
+
+  private HttpServer server = null;
+
+  private SamzaApplicationState state = null;
+
+
+  private JobModelManager getCoordinator(int containerCount) {
+    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
+    for (int i = 0; i < containerCount; i++) {
+      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, 
TaskModel>());
+      containers.put(i, container);
+    }
+    Map<Integer, Map<String, String>> localityMap = new HashMap<>();
+    localityMap.put(0, new HashMap<String, String>() { {
+        put(SetContainerHostMapping.HOST_KEY, "abc");
+      }
+    });
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
+
+    JobModel jobModel = new JobModel(getConfig(), containers, 
mockLocalityManager);
+    JobModelManager.jobModelRef().getAndSet(jobModel);
+
+    JobModelManager reader = new JobModelManager(jobModel, this.server, null);
+
+    return reader;
+  }
+
+  @Before
+  public void setup() throws Exception {
+    server = new MockHttpServer("/", 7777, null, new 
ServletHolder(DefaultServlet.class));
+    state = new SamzaApplicationState(getCoordinator(1));
+  }
+
+  private Field getPrivateFieldFromTaskManager(String fieldName, 
ContainerProcessManager object) throws Exception {
+    Field field = object.getClass().getDeclaredField(fieldName);
+    field.setAccessible(true);
+    return field;
+  }
+
+
+  @Test
+  public void testContainerProcessManager() throws Exception {
+    Map<String, String> conf = new HashMap<>();
+    conf.putAll(getConfig());
+    conf.put("yarn.container.memory.mb", "500");
+    conf.put("yarn.container.cpu.cores", "5");
+
+    ContainerProcessManager taskManager = new ContainerProcessManager(
+        new MapConfig(conf),
+        state,
+        new MetricsRegistryMap(),
+        manager
+    );
+
+    AbstractContainerAllocator allocator = (AbstractContainerAllocator) 
getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).get(taskManager);
+    assertEquals(ContainerAllocator.class, allocator.getClass());
+    // Asserts that samza exposed container configs is honored by allocator 
thread
+    assertEquals(500, allocator.containerMemoryMb);
+    assertEquals(5, allocator.containerNumCpuCores);
+
+    conf.clear();
+    conf.putAll(getConfigWithHostAffinity());
+    conf.put("yarn.container.memory.mb", "500");
+    conf.put("yarn.container.cpu.cores", "5");
+
+    taskManager = new ContainerProcessManager(
+        new MapConfig(conf),
+        state,
+        new MetricsRegistryMap(),
+        manager
+    );
+
+    allocator = (AbstractContainerAllocator) 
getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).get(taskManager);
+    assertEquals(HostAwareContainerAllocator.class, allocator.getClass());
+    // Asserts that samza exposed container configs is honored by allocator 
thread
+    assertEquals(500, allocator.containerMemoryMb);
+    assertEquals(5, allocator.containerNumCpuCores);
+  }
+
+  @Test
+  public void testOnInit() throws Exception {
+    Config conf = getConfig();
+    ContainerProcessManager taskManager = new ContainerProcessManager(
+        new MapConfig(conf),
+        state,
+        new MetricsRegistryMap(),
+        manager
+    );
+
+    MockContainerAllocator allocator = new MockContainerAllocator(
+        manager,
+        conf,
+        state);
+
+    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
+
+    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, new Thread() {
+      public void run() {
+        isRunning = true;
+      }
+    });
+
+    taskManager.start();
+    Thread.sleep(1000);
+
+    // Verify Allocator thread has started running
+    assertTrue(isRunning);
+
+    // Verify the remaining state
+    assertEquals(1, state.neededResources.get());
+    assertEquals(1, allocator.requestedContainers);
+
+    taskManager.stop();
+  }
+
+  @Test
+  public void testOnShutdown() throws Exception {
+    Config conf = getConfig();
+    ContainerProcessManager taskManager =  new ContainerProcessManager(
+        new MapConfig(conf),
+        state,
+        new MetricsRegistryMap(),
+        manager
+    );
+    taskManager.start();
+
+    Thread.sleep(100);
+
+    Thread allocatorThread = (Thread) 
getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager);
+    assertTrue(allocatorThread.isAlive());
+
+    taskManager.stop();
+
+    Thread.sleep(100);
+    assertFalse(allocatorThread.isAlive());
+
+  }
+
+  /**
+   * Test Task Manager should stop when all containers finish
+   */
+  @Test
+  public void testTaskManagerShouldStopWhenContainersFinish() {
+    Config conf = getConfig();
+    ContainerProcessManager taskManager =  new ContainerProcessManager(
+        new MapConfig(conf),
+        state,
+        new MetricsRegistryMap(),
+        manager
+    );
+
+    taskManager.start();
+
+    assertFalse(taskManager.shouldShutdown());
+
+    taskManager.onResourceCompleted(new SamzaResourceStatus("123", 
"diagnostics", SamzaResourceStatus.SUCCESS));
+
+
+    assertTrue(taskManager.shouldShutdown());
+  }
+
+  /**
+   * Test Task Manager should request a new container when a task fails with 
unknown exit code
+   * When host-affinity is not enabled, it will always request for ANY_HOST
+   */
+  @Test
+  public void testNewContainerRequestedOnFailureWithUnknownCode() throws 
Exception {
+    Config conf = getConfig();
+
+    ContainerProcessManager taskManager = new ContainerProcessManager(
+        new MapConfig(conf),
+        state,
+        new MetricsRegistryMap(),
+        manager
+    );
+
+    MockContainerAllocator allocator = new MockContainerAllocator(
+        manager,
+        conf,
+        state);
+
+    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
+
+    Thread thread = new Thread(allocator);
+    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, thread);
+
+    // start triggers a request
+    taskManager.start();
+
+    assertFalse(taskManager.shouldShutdown());
+    assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+
+
+    SamzaResource container = new SamzaResource(1, 1024, "abc", "id0");
+    taskManager.onResourceAllocated(container);
+
+    // Allow container to run and update state
+    Thread.sleep(300);
+
+    // Create first container failure
+    taskManager.onResourceCompleted(new 
SamzaResourceStatus(container.getResourceID(), "diagnostics", 1));
+
+    // The above failure should trigger a container request
+    assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+    assertEquals(ResourceRequestState.ANY_HOST, 
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
+
+
+    assertFalse(taskManager.shouldShutdown());
+    assertFalse(state.jobHealthy.get());
+    assertEquals(2, manager.resourceRequests.size());
+    assertEquals(0, manager.releasedResources.size());
+
+    taskManager.onResourceAllocated(container);
+
+    // Allow container to run and update state
+    Thread.sleep(1000);
+
+    assertTrue(state.jobHealthy.get());
+
+    // Create a second failure
+    taskManager.onResourceCompleted(new 
SamzaResourceStatus(container.getResourceID(), "diagnostics", 1));
+
+
+    // The above failure should trigger a job shutdown because our retry count 
is set to 1
+    assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
+    assertEquals(2, manager.resourceRequests.size());
+    assertEquals(0, manager.releasedResources.size());
+    assertFalse(state.jobHealthy.get());
+    assertTrue(taskManager.shouldShutdown());
+    assertEquals(SamzaApplicationState.SamzaAppStatus.FAILED, state.status);
+
+    taskManager.stop();
+  }
+
+  /**
+   * Test AM requests a new container when a task fails
+   * Error codes with same behavior - Disk failure, preemption and aborted
+   */
+  @Test
+  public void testNewContainerRequestedOnFailureWithKnownCode() throws 
Exception {
+    Config conf = getConfig();
+
+    Map<String, String> config = new HashMap<>();
+    config.putAll(getConfig());
+    config.remove("yarn.container.retry.count");
+
+    ContainerProcessManager taskManager = new ContainerProcessManager(
+        new MapConfig(conf),
+        state,
+        new MetricsRegistryMap(),
+        manager
+    );
+
+    MockContainerAllocator allocator = new MockContainerAllocator(
+        manager,
+        conf,
+        state);
+    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
+
+    Thread thread = new Thread(allocator);
+    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, thread);
+
+    // Start the task manager
+    taskManager.start();
+    assertFalse(taskManager.shouldShutdown());
+    assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+
+    SamzaResource container = new SamzaResource(1, 1000, "abc", "id1");
+    taskManager.onResourceAllocated(container);
+
+    // Allow container to run and update state
+    Thread.sleep(300);
+
+    // Create container failure - with ContainerExitStatus.DISKS_FAILED
+    taskManager.onResourceCompleted(new 
SamzaResourceStatus(container.getResourceID(), "Disk failure", 
SamzaResourceStatus.DISK_FAIL));
+
+    // The above failure should trigger a container request
+    assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+    assertFalse(taskManager.shouldShutdown());
+    assertFalse(state.jobHealthy.get());
+    assertEquals(2, manager.resourceRequests.size());
+    assertEquals(0, manager.releasedResources.size());
+    assertEquals(ResourceRequestState.ANY_HOST, 
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
+
+    // Create container failure - with ContainerExitStatus.PREEMPTED
+    taskManager.onResourceCompleted(new 
SamzaResourceStatus(container.getResourceID(), "Preemption",  
SamzaResourceStatus.PREEMPTED));
+
+    // The above failure should trigger a container request
+    assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+    assertFalse(taskManager.shouldShutdown());
+    assertFalse(state.jobHealthy.get());
+    assertEquals(ResourceRequestState.ANY_HOST, 
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
+
+    // Create container failure - with ContainerExitStatus.ABORTED
+    taskManager.onResourceCompleted(new 
SamzaResourceStatus(container.getResourceID(), "Aborted", 
SamzaResourceStatus.ABORTED));
+
+    // The above failure should trigger a container request
+    assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+    assertEquals(2, manager.resourceRequests.size());
+    assertEquals(0, manager.releasedResources.size());
+    assertFalse(taskManager.shouldShutdown());
+    assertFalse(state.jobHealthy.get());
+    assertEquals(ResourceRequestState.ANY_HOST, 
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
+
+    taskManager.stop();
+  }
+
+  @Test
+  public void testAppMasterWithFwk() {
+    ContainerProcessManager taskManager = new ContainerProcessManager(
+      new MapConfig(config),
+      state,
+      new MetricsRegistryMap(),
+      manager
+    );
+    taskManager.start();
+    SamzaResource container2 = new SamzaResource(1, 1024, "", "id0");
+    assertFalse(taskManager.shouldShutdown());
+    taskManager.onResourceAllocated(container2);
+
+    configVals.put(JobConfig.SAMZA_FWK_PATH(), "/export/content/whatever");
+    Config config1 = new MapConfig(configVals);
+
+    ContainerProcessManager taskManager1 = new ContainerProcessManager(
+        new MapConfig(config),
+        state,
+        new MetricsRegistryMap(),
+        manager
+    );
+    taskManager1.start();
+    taskManager1.onResourceAllocated(container2);
+  }
+
+
+
+  @After
+  public void teardown() {
+    server.stop();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
new file mode 100644
index 0000000..7a514e8
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestContainerRequestState {
+
+  private final MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
+  private final MockClusterResourceManager manager = new 
MockClusterResourceManager(callback);
+
+  private static final String ANY_HOST = ResourceRequestState.ANY_HOST;
+
+  /**
+   * Test state after a request is submitted
+   */
+  @Test
+  public void testUpdateRequestState() {
+    // Host-affinity is enabled
+    ResourceRequestState state = new ResourceRequestState(true, manager);
+    SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", 0);
+    state.addResourceRequest(request);
+
+    assertNotNull(manager.resourceRequests);
+    assertEquals(1, manager.resourceRequests.size());
+
+    assertNotNull(state.numPendingRequests() == 1);
+
+    assertNotNull(state.getRequestsToCountMap());
+    assertNotNull(state.getRequestsToCountMap().get("abc"));
+    assertEquals(1, state.getRequestsToCountMap().get("abc").get());
+
+    assertNotNull(state.getResourcesOnAHost("abc"));
+    assertEquals(0, state.getResourcesOnAHost("abc").size());
+
+    // Host-affinity is not enabled
+    ResourceRequestState state1 = new ResourceRequestState(false, manager);
+    SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, null, 1);
+    state1.addResourceRequest(request1);
+
+    assertNotNull(manager.resourceRequests);
+    assertEquals(2, manager.resourceRequests.size());
+
+
+    assertTrue(state1.numPendingRequests() == 1);
+
+    assertNotNull(state1.getRequestsToCountMap());
+    assertNull(state1.getRequestsToCountMap().get(ANY_HOST));
+
+  }
+
+
+  /**
+   * Test addContainer() updates the state correctly
+   */
+  @Test
+  public void testAddContainer() {
+    // Add container to ANY_LIST when host-affinity is not enabled
+    ResourceRequestState state = new ResourceRequestState(false, manager);
+    SamzaResource resource = new SamzaResource(1, 1024, "abc", "id1");
+
+    state.addResource(resource);
+
+    assertNotNull(state.getRequestsToCountMap());
+    assertNotNull(state.getResourcesOnAHost(ANY_HOST));
+
+    assertEquals(1, state.getResourcesOnAHost(ANY_HOST).size());
+    assertEquals(resource, state.getResourcesOnAHost(ANY_HOST).get(0));
+
+    // Container Allocated when there is no request in queue
+    ResourceRequestState state1 = new ResourceRequestState(true, manager);
+    SamzaResource container1 = new SamzaResource(1, 1024, "zzz", "id2");
+    state1.addResource(container1);
+
+    assertEquals(0, state1.numPendingRequests());
+
+    assertNull(state1.getResourcesOnAHost("zzz"));
+    assertNotNull(state1.getResourcesOnAHost(ANY_HOST));
+    assertEquals(1, state1.getResourcesOnAHost(ANY_HOST).size());
+    assertEquals(container1, state1.getResourcesOnAHost(ANY_HOST).get(0));
+
+    // Container Allocated on a Requested Host
+    state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "abc", 0));
+
+    assertEquals(1, state1.numPendingRequests());
+
+    assertNotNull(state1.getRequestsToCountMap());
+    assertNotNull(state1.getRequestsToCountMap().get("abc"));
+    assertEquals(1, state1.getRequestsToCountMap().get("abc").get());
+
+    state1.addResource(resource);
+
+    assertNotNull(state1.getResourcesOnAHost("abc"));
+    assertEquals(1, state1.getResourcesOnAHost("abc").size());
+    assertEquals(resource, state1.getResourcesOnAHost("abc").get(0));
+
+    // Container Allocated on host that was not requested
+    SamzaResource container2 = new SamzaResource(1, 1024, "xyz", "id2");
+
+    state1.addResource(container2);
+
+    assertNull(state1.getResourcesOnAHost("xyz"));
+    assertNotNull(state1.getResourcesOnAHost(ANY_HOST));
+    assertEquals(2, state1.getResourcesOnAHost(ANY_HOST).size());
+    assertEquals(container2, state1.getResourcesOnAHost(ANY_HOST).get(1));
+
+    // Extra containers were allocated on a host that was requested
+    SamzaResource container3 = new SamzaResource(1, 1024, "abc", "id3");
+    state1.addResource(container3);
+
+    assertEquals(3, state1.getResourcesOnAHost(ANY_HOST).size());
+    assertEquals(container3, state1.getResourcesOnAHost(ANY_HOST).get(2));
+  }
+
+  /**
+   * Test request state after container is assigned to a host
+   * * Assigned on requested host
+   * * Assigned on any host
+   */
+  @Test
+  public void testContainerAssignment() throws Exception {
+    // Host-affinity enabled
+    ResourceRequestState state = new ResourceRequestState(true, manager);
+    SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", 0);
+
+    SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, "def", 
0);
+
+    state.addResourceRequest(request);
+    state.addResourceRequest(request1);
+
+    SamzaResource container = new SamzaResource(1, 1024, "abc", "id0");
+
+    SamzaResource container1 = new SamzaResource(1, 1024, "zzz", "id1");
+    state.addResource(container);
+    state.addResource(container1);
+
+    assertEquals(2, state.numPendingRequests());
+    assertEquals(2, state.getRequestsToCountMap().size());
+
+    assertNotNull(state.getResourcesOnAHost("abc"));
+    assertEquals(1, state.getResourcesOnAHost("abc").size());
+    assertEquals(container, state.getResourcesOnAHost("abc").get(0));
+
+    assertNotNull(state.getResourcesOnAHost("def"));
+    assertEquals(0, state.getResourcesOnAHost("def").size());
+
+    assertNotNull(state.getResourcesOnAHost(ANY_HOST));
+    assertEquals(1, state.getResourcesOnAHost(ANY_HOST).size());
+    assertEquals(container1, state.getResourcesOnAHost(ANY_HOST).get(0));
+
+    // Container assigned on the requested host
+    state.updateStateAfterAssignment(request, "abc", container);
+
+    assertEquals(request1, state.peekPendingRequest());
+
+    assertNotNull(state.getRequestsToCountMap().get("abc"));
+    assertEquals(0, state.getRequestsToCountMap().get("abc").get());
+
+    assertNotNull(state.getResourcesOnAHost("abc"));
+    assertEquals(0, state.getResourcesOnAHost("abc").size());
+
+    // Container assigned on any host
+    state.updateStateAfterAssignment(request1, ANY_HOST, container1);
+
+    assertEquals(0, state.numPendingRequests());
+
+    assertNotNull(state.getRequestsToCountMap().get("def"));
+    assertEquals(0, state.getRequestsToCountMap().get("def").get());
+
+    assertNotNull(state.getResourcesOnAHost(ANY_HOST));
+    assertEquals(0, state.getResourcesOnAHost(ANY_HOST).size());
+
+  }
+
+
+}

Reply via email to