SAMZA-680: Inverting JobCoordinator and SamzaAppMaster logic

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/947472a0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/947472a0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/947472a0

Branch: refs/heads/master
Commit: 947472a0b471716d88b0381c37ac372bd012fcf6
Parents: 9f7abf5
Author: Jagadish Venkatramen <[email protected]>
Authored: Thu Jun 2 14:50:55 2016 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Thu Jun 2 14:50:55 2016 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  17 +
 .../AbstractContainerAllocator.java             | 269 ++++++++++++
 .../ClusterBasedJobCoordinator.java             | 224 ++++++++++
 .../clustermanager/ClusterResourceManager.java  | 158 +++++++
 .../clustermanager/ContainerAllocator.java      |  55 +++
 .../clustermanager/ContainerProcessManager.java | 421 ++++++++++++++++++
 .../HostAwareContainerAllocator.java            | 107 +++++
 .../samza/clustermanager/ResourceFailure.java   |  49 +++
 .../clustermanager/ResourceManagerFactory.java  |  37 ++
 .../clustermanager/ResourceRequestState.java    | 336 +++++++++++++++
 .../clustermanager/SamzaApplicationState.java   | 124 ++++++
 .../SamzaContainerLaunchException.java          |  44 ++
 .../samza/clustermanager/SamzaResource.java     |  78 ++++
 .../clustermanager/SamzaResourceRequest.java    | 123 ++++++
 .../clustermanager/SamzaResourceStatus.java     |  91 ++++
 .../samza/config/ClusterManagerConfig.java      | 203 +++++++++
 .../apache/samza/container/LocalityManager.java |   2 +-
 .../grouper/task/TaskAssignmentManager.java     |   2 +-
 .../apache/samza/storage/StorageRecovery.java   |   4 +-
 .../samza/checkpoint/CheckpointTool.scala       |   6 +-
 .../samza/coordinator/JobCoordinator.scala      |  14 +-
 .../org/apache/samza/job/local/ProcessJob.scala |   4 +-
 .../samza/job/local/ProcessJobFactory.scala     |   4 +-
 .../samza/job/local/ThreadJobFactory.scala      |   4 +-
 .../ContainerProcessManagerMetrics.scala        |  83 ++++
 .../MockClusterResourceManager.java             |  90 ++++
 .../MockClusterResourceManagerCallback.java     |  44 ++
 .../clustermanager/MockContainerAllocator.java  |  48 +++
 .../clustermanager/MockContainerListener.java   | 145 +++++++
 .../MockContainerRequestState.java              |  91 ++++
 .../samza/clustermanager/MockHttpServer.java    |  56 +++
 .../clustermanager/TestContainerAllocator.java  | 274 ++++++++++++
 .../TestContainerProcessManager.java            | 426 +++++++++++++++++++
 .../TestContainerRequestState.java              | 198 +++++++++
 .../TestHostAwareContainerAllocator.java        | 357 ++++++++++++++++
 .../samza/container/TestSamzaContainer.scala    |  14 +-
 .../samza/coordinator/TestJobCoordinator.scala  |   9 +-
 .../apache/samza/job/local/TestProcessJob.scala |   8 +-
 .../samza/logging/log4j/StreamAppender.java     |   6 +-
 samza-shell/src/main/bash/run-am.sh             |   2 +-
 samza-shell/src/main/bash/run-jc.sh             |  25 ++
 .../apache/samza/job/yarn/SamzaAppState.java    |   8 +-
 .../apache/samza/job/yarn/SamzaTaskManager.java |   2 +-
 .../org/apache/samza/job/yarn/YarnAppState.java | 146 +++++++
 .../job/yarn/YarnClusterResourceManager.java    | 402 +++++++++++++++++
 .../samza/job/yarn/YarnContainerRunner.java     | 258 +++++++++++
 .../job/yarn/YarnResourceManagerFactory.java    |  45 ++
 .../samza/validation/YarnJobValidationTool.java |   7 +-
 .../apache/samza/job/yarn/SamzaAppMaster.scala  |   4 +-
 .../job/yarn/SamzaYarnAppMasterLifecycle.scala  |  90 ++++
 .../job/yarn/SamzaYarnAppMasterService.scala    |  80 ++++
 .../job/yarn/TestContainerAllocatorCommon.java  |   6 +-
 .../samza/job/yarn/TestSamzaTaskManager.java    |   8 +-
 .../job/yarn/TestSamzaAppMasterLifecycle.scala  |   4 +-
 .../job/yarn/TestSamzaAppMasterService.scala    |   6 +-
 55 files changed, 5255 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index fad7b55..7a09c7e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -42,6 +42,23 @@
         <allow class="org.apache.samza.Partition" />
     </subpackage>
 
+    <subpackage name="clustermanager">
+        <allow class="org.apache.samza.SamzaException" />
+        <allow pkg="org.apache.samza.system" />
+        <allow pkg="org.apache.samza.util" />
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="org.apache.samza.coordinator" />
+        <allow pkg="org.apache.samza.job" />
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.serializers" />
+        <allow pkg="org.apache.samza.container" />
+        <allow pkg="org.eclipse.jetty.servlet" />
+
+
+        <allow class="org.apache.samza.Partition" />
+    </subpackage>
+
+
     <subpackage name="serializers">
         <allow pkg="org.apache.samza.config" />
 

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
new file mode 100644
index 0000000..097a476
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.job.CommandBuilder;
+import org.apache.samza.job.ShellCommandBuilder;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+
+/**
+ * {@link AbstractContainerAllocator} makes requests for physical resources to 
the resource manager and also runs
+ * a container process on an allocated physical resource. Sub-classes should 
override the assignResourceRequests()
+ * method to assign resource requests according to some strategy.
+ *
+ * See {@link ContainerAllocator} and {@link HostAwareContainerAllocator} for 
two such strategies
+ *
+ * This class is not thread-safe.
+ */
+
+//This class is used in the refactored code path as called by run-jc.sh
+
+public abstract class AbstractContainerAllocator implements Runnable {
+
+  private static final Logger log = 
LoggerFactory.getLogger(AbstractContainerAllocator.class);
+
+  /* State that controls the lifecycle of the allocator thread*/
+  private volatile boolean isRunning = true;
+
+  /**
+   * Config and derived config objects
+   */
+  private final TaskConfig taskConfig;
+
+  private final Config config;
+
+  /**
+   * A ClusterResourceManager for the allocator to request for resources.
+   */
+  protected final ClusterResourceManager clusterResourceManager;
+  /**
+   * The allocator sleeps for allocatorSleepIntervalMs before it polls its 
queue for the next request
+   */
+  protected final int allocatorSleepIntervalMs;
+  /**
+   * Each container currently has the same configuration - memory, and 
numCpuCores.
+   */
+  protected final int containerMemoryMb;
+  protected final int containerNumCpuCores;
+
+  /**
+   * State corresponding to num failed containers, running containers etc.
+   */
+  protected final SamzaApplicationState state;
+
+  /**
+   * ContainerRequestState indicates the state of all unfulfilled container 
requests and allocated containers
+   */
+  protected final ResourceRequestState resourceRequestState;
+
+  public AbstractContainerAllocator(ClusterResourceManager 
containerProcessManager,
+                                    ResourceRequestState resourceRequestState,
+                                    Config config,
+                                    SamzaApplicationState state) {
+    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
+    this.clusterResourceManager = containerProcessManager;
+    this.allocatorSleepIntervalMs = 
clusterManagerConfig.getAllocatorSleepTime();
+    this.resourceRequestState = resourceRequestState;
+    this.containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
+    this.containerNumCpuCores = clusterManagerConfig.getNumCores();
+    this.taskConfig = new TaskConfig(config);
+    this.state = state;
+    this.config = config;
+  }
+
+  /**
+   * Continually schedule StreamProcessors to run on resources obtained from 
the cluster manager.
+   * The loop frequency is governed by thread sleeps for 
allocatorSleepIntervalMs ms.
+   *
+   * Terminates when the isRunning flag is cleared.
+   */
+  @Override
+  public void run() {
+    while (isRunning) {
+      try {
+        assignResourceRequests();
+        // Release extra resources and update the entire system's state
+        resourceRequestState.releaseExtraResources();
+
+        Thread.sleep(allocatorSleepIntervalMs);
+      } catch (InterruptedException e) {
+        log.warn("Got InterruptedException in AllocatorThread.", e);
+        Thread.currentThread().interrupt();
+      } catch (Exception e) {
+        log.error("Got unknown Exception in AllocatorThread.", e);
+      }
+    }
+  }
+
+  /**
+   * Assign resources from the cluster manager and matches them to run 
container processes on them.
+   *
+   */
+  protected abstract void assignResourceRequests();
+
+  /**
+   * Updates the request state and runs a container process on the specified 
host. Assumes a resource
+   * is available on the preferred host, so the caller must verify that before 
invoking this method.
+   *
+   * @param request             the {@link SamzaResourceRequest} which is 
being handled.
+   * @param preferredHost       the preferred host on which the 
StreamProcessor process should be run or
+   *                            {@link ResourceRequestState#ANY_HOST} if there 
is no host preference.
+   * @throws
+   * SamzaException if there is no allocated resource in the specified host.
+   */
+  protected void runStreamProcessor(SamzaResourceRequest request, String 
preferredHost) {
+    CommandBuilder builder = getCommandBuilder(request.getContainerID());
+    // Get the available resource
+    SamzaResource resource = peekAllocatedResource(preferredHost);
+    if (resource == null)
+      throw new SamzaException("Expected resource was unavailable on host " + 
preferredHost);
+
+    // Update state
+    resourceRequestState.updateStateAfterAssignment(request, preferredHost, 
resource);
+    int containerID = request.getContainerID();
+
+    //run container on resource
+    log.info("Found available resources on {}. Assigning request for 
container_id {} with "
+            + "timestamp {} to resource {}",
+        new Object[]{preferredHost, String.valueOf(containerID), 
request.getRequestTimestampMs(), resource.getResourceID()});
+    try {
+      //launches a StreamProcessor on the resource
+      clusterResourceManager.launchStreamProcessor(resource, builder);
+
+      if (state.neededResources.decrementAndGet() == 0) {
+        state.jobHealthy.set(true);
+      }
+      state.runningContainers.put(request.getContainerID(), resource);
+
+    } catch (SamzaContainerLaunchException e) {
+      log.warn(String.format("Got exception while starting resource %s. 
Requesting a new resource on any host", resource), e);
+      resourceRequestState.releaseUnstartableContainer(resource);
+      requestResource(containerID, ResourceRequestState.ANY_HOST);
+    }
+  }
+
+  /**
+   * Called during initial request for resources
+   *
+   * @param resourceToHostMappings A Map of [containerId, hostName] 
containerId is the ID of the container process
+   *                               to run on the resource. hostName is the 
host on which the resource must be allocated.
+   *                                The hostName value is null, either
+   *                                - when host-affinity is not enabled, or
+   *                                - when host-affinity is enabled and job is 
run for the first time
+   */
+  public void requestResources(Map<Integer, String> resourceToHostMappings) {
+    for (Map.Entry<Integer, String> entry : resourceToHostMappings.entrySet()) 
{
+      int containerId = entry.getKey();
+      String preferredHost = entry.getValue();
+      if (preferredHost == null)
+        preferredHost = ResourceRequestState.ANY_HOST;
+
+      requestResource(containerId, preferredHost);
+    }
+  }
+
+  /**
+   * Checks if this allocator has a pending resource request.
+   * @return {@code true} if there is a pending request, {@code false} 
otherwise.
+   */
+  protected final boolean hasPendingRequest() {
+    return peekPendingRequest() != null;
+  }
+
+  /**
+   * Retrieves, but does not remove, the next pending request in the queue.
+   *
+   * @return  the pending request or {@code null} if there is no pending 
request.
+   */
+  protected final SamzaResourceRequest peekPendingRequest() {
+    return resourceRequestState.peekPendingRequest();
+  }
+
+  /**
+   * Method to request a resource from the cluster manager
+   *
+   * @param containerID Identifier of the container that will be run when a 
resource is allocated for
+   *                            this request
+   * @param preferredHost Name of the host that you prefer to run the 
container on
+   */
+  public final void requestResource(int containerID, String preferredHost) {
+    SamzaResourceRequest request = new 
SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb,
+        preferredHost, containerID);
+    resourceRequestState.addResourceRequest(request);
+    state.containerRequests.incrementAndGet();
+  }
+
+  /**
+   * Returns true if there are resources allocated on a host.
+   * @param host  the host for which a resource is needed.
+   * @return      {@code true} if there is a resource allocated for the 
specified host, {@code false} otherwise.
+   */
+  protected boolean hasAllocatedResource(String host) {
+    return peekAllocatedResource(host) != null;
+  }
+
+  /**
+   * Retrieves, but does not remove, the first allocated resource on the 
specified host.
+   *
+   * @param host  the host on which a resource is needed.
+   * @return      the first {@link SamzaResource} allocated for the specified 
host or {@code null} if there isn't one.
+   */
+  protected SamzaResource peekAllocatedResource(String host) {
+    return resourceRequestState.peekResource(host);
+  }
+
+  /**
+   * Returns a command builder with the build environment configured with the 
containerId.
+   * @param samzaContainerId to configure the builder with.
+   * @return the constructed builder object
+   */
+  private CommandBuilder getCommandBuilder(int samzaContainerId) {
+    String cmdBuilderClassName = 
taskConfig.getCommandClass(ShellCommandBuilder.class.getName());
+    CommandBuilder cmdBuilder = (CommandBuilder) 
Util.getObj(cmdBuilderClassName);
+
+    
cmdBuilder.setConfig(config).setId(samzaContainerId).setUrl(state.jobModelManager.server().getUrl());
+    return cmdBuilder;
+  }
+  /**
+   * Adds allocated samzaResource to a synchronized buffer of allocated 
resources.
+   * See allocatedResources in {@link ResourceRequestState}
+   *
+   * @param samzaResource returned by the ContainerManager
+   */
+  public final void addResource(SamzaResource samzaResource) {
+    resourceRequestState.addResource(samzaResource);
+  }
+
+  /**
+   * Stops the Allocator. Setting this flag to false, exits the allocator loop.
+   */
+  public void stop() {
+    isRunning = false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
new file mode 100644
index 0000000..d0d4e34
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.metrics.JmxServer;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Implements a JobCoordinator that is completely independent of the 
underlying cluster
+ * manager system. This {@link ClusterBasedJobCoordinator} handles 
functionality common
+ * to both Yarn and Mesos. It takes care of
+ *  1. Requesting resources from an underlying {@link ClusterResourceManager}.
+ *  2. Ensuring that placement of containers to resources happens (as per 
whether host affinity
+ *  is configured or not).
+ *
+ *  Any offer based cluster management system that must integrate with Samza 
will merely
+ *  implement a {@link ResourceManagerFactory} and a {@link 
ClusterResourceManager}.
+ *
+ *  This class is not thread-safe. For safe access in multi-threaded context, 
invocations
+ *  should be synchronized by the callers.
+ *
+ * TODO:
+ * 1. Refactor ClusterResourceManager to also handle process liveness, process 
start
+ * callbacks
+ * 2. Refactor the JobModelReader to be an interface.
+ * 3. Make ClusterBasedJobCoordinator implement the JobCoordinator API as in 
SAMZA-881.
+ * 4. Refactor UI state variables.
+ * 5. Unit tests.
+ * 6. Document newly added configs.
+ */
+public class ClusterBasedJobCoordinator {
+
+  private static final Logger log = 
LoggerFactory.getLogger(ClusterBasedJobCoordinator.class);
+
+  private final Config config;
+
+  private final ClusterManagerConfig clusterManagerConfig;
+
+  /**
+   * State to track container failures, host-container mappings
+   */
+  private final SamzaApplicationState state;
+
+  /**
+   * Metrics to track stats around container failures, needed containers etc.
+   */
+
+  //even though some of these can be converted to local variables, it will not 
be the case
+  //as we add more methods to the JobCoordinator and completely implement 
SAMZA-881.
+
+  /**
+   * Handles callback for allocated containers, failed containers.
+   */
+  private final ContainerProcessManager containerProcessManager;
+
+  /**
+   * A JobModelManager to return and refresh the {@link 
org.apache.samza.job.model.JobModel} when required.
+   */
+  private final JobModelManager jobModelManager;
+
+  /*
+   * The interval for polling the Task Manager for shutdown.
+   */
+  private final long jobCoordinatorSleepInterval;
+
+  /*
+   * Config specifies if a Jmx server should be started on this Job Coordinator
+   */
+  private final boolean isJmxEnabled;
+
+  /**
+   * Internal boolean to check if the job coordinator has already been started.
+   */
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+  private JmxServer jmxServer;
+
+
+  /**
+   * Creates a new ClusterBasedJobCoordinator instance from a config. Invoke 
run() to actually
+   * run the jobcoordinator.
+   *
+   * @param coordinatorSystemConfig the coordinator stream config that can be 
used to read the
+   *                                {@link 
org.apache.samza.job.model.JobModel} from.
+   */
+  public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) {
+
+    MetricsRegistryMap registry = new MetricsRegistryMap();
+
+    //build a JobModelReader and perform partition assignments.
+    jobModelManager = buildJobModelManager(coordinatorSystemConfig, registry);
+    config = jobModelManager.jobModel().getConfig();
+    state = new SamzaApplicationState(jobModelManager);
+    clusterManagerConfig = new ClusterManagerConfig(config);
+    isJmxEnabled = clusterManagerConfig.getJmxEnabled();
+
+    jobCoordinatorSleepInterval = 
clusterManagerConfig.getJobCoordinatorSleepInterval();
+
+    // build a container process Manager
+    containerProcessManager = new ContainerProcessManager(config, state, 
registry);
+  }
+
+
+  /**
+   * Starts the JobCoordinator.
+   *
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      log.info("Attempting to start an already started job coordinator. ");
+      return;
+    }
+    // set up JmxServer (if jmx is enabled)
+    if (isJmxEnabled) {
+      jmxServer = new JmxServer();
+      state.jmxUrl = jmxServer.getJmxUrl();
+      state.jmxTunnelingUrl = jmxServer.getTunnelingJmxUrl();
+    } else {
+      jmxServer = null;
+    }
+
+    try {
+      //initialize JobCoordinator state
+      log.info("Starting Cluster Based Job Coordinator");
+
+      containerProcessManager.start();
+
+      boolean isInterrupted = false;
+
+      while (!containerProcessManager.shouldShutdown() && !isInterrupted) {
+        try {
+          Thread.sleep(jobCoordinatorSleepInterval);
+        } catch (InterruptedException e) {
+          isInterrupted = true;
+          log.error("Interrupted in job coordinator loop {} ", e);
+          Thread.currentThread().interrupt();
+        }
+      }
+    } catch (Throwable e) {
+      log.error("Exception thrown in the JobCoordinator loop {} ", e);
+      throw new SamzaException(e);
+    } finally {
+      onShutDown();
+    }
+  }
+
+  /**
+   * Stops all components of the JobCoordinator.
+   */
+  private void onShutDown() {
+
+    if (containerProcessManager != null) {
+      try {
+        containerProcessManager.stop();
+      } catch (Throwable e) {
+        log.error("Exception while stopping task manager {}", e);
+      }
+      log.info("Stopped task manager");
+    }
+
+    if (jmxServer != null) {
+      try {
+        jmxServer.stop();
+        log.info("Stopped Jmx Server");
+      } catch (Throwable e) {
+        log.error("Exception while stopping jmx server {}", e);
+      }
+    }
+  }
+
+  private JobModelManager buildJobModelManager(Config coordinatorSystemConfig, 
MetricsRegistryMap registry)  {
+    JobModelManager jobModelManager = 
JobModelManager.apply(coordinatorSystemConfig, registry);
+    return jobModelManager;
+  }
+
+
+
+  /**
+   * The entry point for the {@link ClusterBasedJobCoordinator}
+   * @param args args
+   */
+  public static void main(String[] args) {
+    Config coordinatorSystemConfig = null;
+    final String coordinatorSystemEnv = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
+    try {
+      //Read and parse the coordinator system config.
+      log.info("Parsing coordinator system config {}", coordinatorSystemEnv);
+      coordinatorSystemConfig = new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, 
Config.class));
+    } catch (IOException e) {
+      log.error("Exception while reading coordinator stream config {}", e);
+      throw new SamzaException(e);
+    }
+    ClusterBasedJobCoordinator jc = new 
ClusterBasedJobCoordinator(coordinatorSystemConfig);
+    jc.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
new file mode 100644
index 0000000..715cf66
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.job.CommandBuilder;
+
+import java.util.List;
+
+/**
+ * <code>ClusterResourceManager</code> handles communication with a cluster 
manager
+ * and provides updates on events such as resource allocations and
+ * completions. Any offer-based resource management system that integrates 
with Samza
+ * will provide an implementation of a ClusterResourceManager API.
+ *
+ * This class is meant to be used by implementing a CallbackHandler:
+ * <pre>
+ * {@code
+ * class MyCallbackHandler implements ClusterResourceManager.CallbackHandler {
+ *   public void onResourcesAvailable(List<SamzaResource> resources) {
+ *     [launch a streamprocessor on the resources]
+ *   }
+ *
+ *   public void onResourcesCompleted(List<SamzaResourceStatus> 
resourceStatus) {
+ *     [check for exit code to examine diagnostics, and take actions]
+ *   }
+ *
+ *   public void onError(Throwable error) {
+ *     [stop the container process manager]
+ *   }
+ *
+ * }
+ * }
+ * </pre>
+ *
+ * The lifecycle of a ClusterResourceManager should be managed similarly to 
the following:
+ *
+ * <pre>
+ * {@code
+ * ClusterResourceManager processManager =
+ *     new ClusterResourceManager(callback);
+ * processManager.start();
+ * [... request resources ...]
+ * [... wait for application to complete ...]
+ * processManager.stop();
+ * }
+ * </pre>
+ */
+
+
+/***
+ * TODO:
+ * 1.Investigate what it means to kill a StreamProcessor, and add it as an API 
here.
+ * 2.Consider an API for Container Process liveness - ie, to be notified when 
a StreamProcessor
+ * joins or leaves the group. Will evolve more as we implement standalone and 
mesos.
+ */
+
+public abstract class ClusterResourceManager {
+
+  protected final Callback clusterManagerCallback;
+
+  public ClusterResourceManager(Callback callback) {
+    this.clusterManagerCallback = callback;
+  }
+
+  public abstract void start();
+
+  /***
+   * Request resources for running container processes
+   * @param resourceRequest the resourceRequest being made
+   */
+  public abstract void requestResources(SamzaResourceRequest resourceRequest);
+
+  /***
+   * Remove a previously submitted resource request. The previous resource 
request may
+   * have been submitted to the cluster manager. Even after the remove 
request, a ContainerProcessManagerCallback
+   * implementation must be prepared to receive an allocation for the previous 
request.
+   * This is merely a best effort cancellation.
+   *
+   * @param request, the resource request that must be cancelled
+   */
+  public abstract void cancelResourceRequest(SamzaResourceRequest request);
+
+
+  /***
+   * If the app cannot use the resource or wants to give up the resource, it 
can release them.
+   * @param resource the resource to be released
+   */
+  public abstract void releaseResources(SamzaResource resource);
+
+  /***
+   * Requests the launch of a StreamProcessor with the specified context on 
the resource.
+   * @param resource the specified resource
+   * @param builder A builder implementation that encapsulates the context for 
the
+   *                StreamProcessor. A builder encapsulates the ID for the 
processor, the
+   *                build environment, the command to execute etc.
+   * @throws SamzaContainerLaunchException  when there's an error during the 
requesting launch.
+   *
+   */
+  public abstract void launchStreamProcessor(SamzaResource resource, 
CommandBuilder builder) throws SamzaContainerLaunchException;
+
+
+  public abstract void stop(SamzaApplicationState.SamzaAppStatus status);
+
+
+  /***
+   *Defines a callback interface for interacting with notifications from a 
ClusterResourceManager
+   */
+  public interface Callback {
+
+    /***
+     * This callback is invoked when there are resources that are to be 
offered to the application.
+     * Often, resources that an app requests may not be available. The 
application must be prepared
+     * to handle callbacks for resources that it did not request.
+     * @param resources that are offered to the application
+     */
+    void onResourcesAvailable(List<SamzaResource> resources);
+
+    /***
+     * This callback is invoked when resources are no longer available to the 
application. A
+     * resource could be marked 'completed' in scenarios like - failure of 
disk on the host,
+     * pre-emption of the resource to run another StreamProcessor, exit or 
termination of the
+     * StreamProcessor running in the resource.
+     *
+     * The SamzaResourceStatus contains diagnostics on why the failure occured
+     * @param resources statuses for the resources that were completed.
+     */
+    void onResourcesCompleted(List<SamzaResourceStatus> resources);
+
+    /***
+     * This callback is invoked when there is an error in the 
ClusterResourceManager. This is
+     * guaranteed to be invoked when there is an uncaught exception in any 
other
+     * ClusterResourceManager callbacks.
+     * @param e  the underlying Throwable was thrown.
+     */
+    void onError(Throwable e);
+  }
+}
+
+
+
+

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
new file mode 100644
index 0000000..5997a7a
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the default allocator that will be used by ContainerProcessManager.
+ *
+ * When host-affinity is not enabled, this periodically wakes up to assign a 
container to *ANY* allocated resource.
+ * If there aren't enough containers, it waits by sleeping for {@code 
allocatorSleepIntervalMs} milliseconds.
+ */
+//This class is used in the refactored code path as called by run-jc.sh
+
+public class ContainerAllocator extends AbstractContainerAllocator {
+  private static final Logger log = 
LoggerFactory.getLogger(ContainerAllocator.class);
+
+  public ContainerAllocator(ClusterResourceManager manager,
+                            Config config, SamzaApplicationState state) {
+    super(manager, new ResourceRequestState(false, manager), config, state);
+  }
+
+  /**
+   * During the run() method, the thread sleeps for allocatorSleepIntervalMs 
ms. It then invokes assignResourceRequests,
+   * and tries to allocate any unsatisfied request that is still in the 
request queue {@link ResourceRequestState})
+   * with allocated resources, if any.
+   *
+   * Since host-affinity is not enabled, all allocated resources are buffered 
in the list keyed by "ANY_HOST".
+   * */
+  @Override
+  public void assignResourceRequests() {
+    while (hasPendingRequest() && 
hasAllocatedResource(ResourceRequestState.ANY_HOST)) {
+      SamzaResourceRequest request = peekPendingRequest();
+      runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
new file mode 100644
index 0000000..f2db34c
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.metrics.ContainerProcessManagerMetrics;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ContainerProcessManager is responsible for requesting containers, handling 
failures, and notifying the application master that the
+ * job is done.
+ *
+ * The following main threads are involved in the execution of the 
ContainerProcessManager :
+ *  - The main thread (defined in SamzaAppMaster) that sends requests to the 
cluster manager.
+ *  - The callback handler thread that receives the responses from cluster 
manager and handles:
+ *      - Populating a buffer when a container is allocated by the cluster 
manager
+ *        (allocatedContainers in {@link ResourceRequestState}
+ *      - Identifying the cause of container failure and re-request containers 
from the cluster manager by adding request to the
+ *        internal requestQueue in {@link ResourceRequestState}
+ *  - The allocator thread defined here assigns the allocated containers to 
pending requests
+ *    (See {@link org.apache.samza.clustermanager.ContainerAllocator} or 
{@link org.apache.samza.clustermanager.HostAwareContainerAllocator})
+ *
+ */
+
+public class ContainerProcessManager implements 
ClusterResourceManager.Callback   {
+
+  private static final Logger log = 
LoggerFactory.getLogger(ContainerProcessManager.class);
+  /**
+   * Does this Samza Job need hostAffinity when containers are allocated.
+   */
+  private final boolean hostAffinityEnabled;
+
+  /**
+   * State variables tracking containers allocated, freed, running, released.
+   */
+  private final SamzaApplicationState state;
+
+  /**
+   * Config for this Samza job
+   */
+  private final ClusterManagerConfig clusterManagerConfig;
+  private final JobConfig jobConfig;
+
+  /**
+   * The Allocator matches requests to resources and executes processes.
+   */
+  private final AbstractContainerAllocator containerAllocator;
+  private final Thread allocatorThread;
+
+  /**
+   * A standard interface to request resources.
+   */
+  private final ClusterResourceManager clusterResourceManager;
+
+  /**
+   * If there are too many failed container failures (configured by 
job.container.retry.count) for a
+   * container, the job exits.
+   */
+  private volatile boolean tooManyFailedContainers = false;
+
+  private volatile Throwable exceptionOccurred = null;
+
+
+  /**
+   * A map that keeps track of how many times each container failed. The key 
is the container ID, and the
+   * value is the {@link ResourceFailure} object that has a count of failures.
+   *
+   */
+  private final Map<Integer, ResourceFailure> containerFailures = new 
HashMap<>();
+
+  private final ContainerProcessManagerMetrics metrics;
+
+
+  public ContainerProcessManager(Config config,
+                                 SamzaApplicationState state,
+                                 MetricsRegistryMap registry) {
+    this.state = state;
+    this.clusterManagerConfig = new ClusterManagerConfig(config);
+    this.jobConfig = new JobConfig(config);
+
+    this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
+
+    ResourceManagerFactory factory = 
getContainerProcessManagerFactory(clusterManagerConfig);
+    this.clusterResourceManager = factory.getClusterResourceManager(this, 
state);
+    this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
+
+    if (this.hostAffinityEnabled) {
+      this.containerAllocator = new 
HostAwareContainerAllocator(clusterResourceManager, 
clusterManagerConfig.getContainerRequestTimeout(), config, state);
+    } else {
+      this.containerAllocator = new ContainerAllocator(clusterResourceManager, 
config, state);
+    }
+
+    this.allocatorThread = new Thread(this.containerAllocator, "Container 
Allocator Thread");
+    log.info("finished initialization of samza task manager");
+
+  }
+
+  //package private, used only in tests
+  ContainerProcessManager(Config config,
+                          SamzaApplicationState state,
+                          MetricsRegistryMap registry,
+                          ClusterResourceManager resourceManager) {
+    JobModelManager jobModelManager = state.jobModelManager;
+    this.state = state;
+    this.clusterManagerConfig = new ClusterManagerConfig(config);
+    this.jobConfig = new JobConfig(config);
+
+    this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
+
+    this.clusterResourceManager = resourceManager;
+    this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
+
+
+    if (this.hostAffinityEnabled) {
+      this.containerAllocator = new 
HostAwareContainerAllocator(clusterResourceManager, 
clusterManagerConfig.getContainerRequestTimeout(), config, state);
+    } else {
+      this.containerAllocator = new ContainerAllocator(clusterResourceManager, 
config, state);
+    }
+
+    this.allocatorThread = new Thread(this.containerAllocator, "Container 
Allocator Thread");
+    log.info("finished initialization of samza task manager");
+
+  }
+
+  public boolean shouldShutdown() {
+    log.info(" TaskManager state: Too many FailedContainers: {} No. Completed 
containers: {} Num Configured containers: {}" +
+      " AllocatorThread liveness: {} ", new Object[]{tooManyFailedContainers, 
state.completedContainers.get(), state.containerCount, 
allocatorThread.isAlive()});
+
+    if (exceptionOccurred != null) {
+      log.error("Exception in ContainerProcessManager", exceptionOccurred);
+      throw new SamzaException(exceptionOccurred);
+    }
+    return tooManyFailedContainers || state.completedContainers.get() == 
state.containerCount.get() || !allocatorThread.isAlive();
+  }
+
+  public void start() {
+    metrics.start();
+
+    log.info("Starting Container Process Manager");
+    clusterResourceManager.start();
+
+    log.info("Starting the Samza task manager");
+    final int containerCount = jobConfig.getContainerCount();
+
+    state.containerCount.set(containerCount);
+    state.neededResources.set(containerCount);
+
+    // Request initial set of containers
+    Map<Integer, String> containerToHostMapping = 
state.jobModelManager.jobModel().getAllContainerLocality();
+
+    containerAllocator.requestResources(containerToHostMapping);
+
+    // Start container allocator thread
+    log.info("Starting the container allocator thread");
+    allocatorThread.start();
+  }
+
+  public void stop() {
+    log.info("Invoked stop of the Samza container process manager");
+
+    // Shutdown allocator thread
+    containerAllocator.stop();
+    try {
+      allocatorThread.join();
+    } catch (InterruptedException ie) {
+      log.error("Allocator Thread join() threw an interrupted exception", ie);
+      Thread.currentThread().interrupt();
+    }
+
+    if (metrics != null) {
+      try {
+        metrics.stop();
+      } catch (Throwable e) {
+        log.error("Exception while stopping metrics {}", e);
+      }
+      log.info("Stopped metrics reporters");
+    }
+
+    if (clusterResourceManager != null) {
+      try {
+        clusterResourceManager.stop(state.status);
+      } catch (Throwable e) {
+        log.error("Exception while stopping cluster resource manager {}", e);
+      }
+      log.info("Stopped cluster resource manager");
+    }
+
+    log.info("Finished stop of Container process manager");
+
+  }
+
+  public void onResourceAllocated(SamzaResource container) {
+    log.info("Container allocated from RM on " + container.getHost());
+    containerAllocator.addResource(container);
+  }
+
+  /**
+   * This methods handles the onResourceCompleted callback from the RM. Based 
on the ContainerExitStatus, it decides
+   * whether a container that exited is marked as complete or failure.
+   * @param containerStatus of the resource that completed
+   */
+  public void onResourceCompleted(SamzaResourceStatus containerStatus) {
+    String containerIdStr = containerStatus.getResourceID();
+    int containerId = -1;
+    for (Map.Entry<Integer, SamzaResource> entry: 
state.runningContainers.entrySet()) {
+      if 
(entry.getValue().getResourceID().equals(containerStatus.getResourceID())) {
+        log.info("Matching container ID found " + entry.getKey() + " " + 
entry.getValue());
+
+        containerId = entry.getKey();
+        break;
+      }
+    }
+    if (containerId == -1) {
+      log.info("No matching container id found for " + 
containerStatus.toString());
+    }
+    state.runningContainers.remove(containerId);
+
+    int exitStatus = containerStatus.getExitCode();
+    switch (exitStatus) {
+      case SamzaResourceStatus.SUCCESS:
+        log.info("Container {} completed successfully.", containerIdStr);
+
+        state.completedContainers.incrementAndGet();
+
+        if (containerId != -1) {
+          state.finishedContainers.add(containerId);
+          containerFailures.remove(containerId);
+        }
+
+        if (state.completedContainers.get() == state.containerCount.get()) {
+          log.info("Setting job status to SUCCEEDED, since all containers have 
been marked as completed.");
+          state.status = SamzaApplicationState.SamzaAppStatus.SUCCEEDED;
+        }
+        break;
+
+      case SamzaResourceStatus.DISK_FAIL:
+      case SamzaResourceStatus.ABORTED:
+      case SamzaResourceStatus.PREEMPTED:
+        log.info("Got an exit code of {}. This means that container {} was "
+                  + "killed by YARN, either due to being released by the 
application "
+                  + "master or being 'lost' due to node failures etc. or due 
to preemption by the RM",
+                exitStatus,
+                containerIdStr);
+
+        state.releasedContainers.incrementAndGet();
+
+        // If this container was assigned some partitions (a containerId), then
+        // clean up, and request a refactor container for the tasks. This only
+        // should happen if the container was 'lost' due to node failure, not
+        // if the AM released the container.
+        if (containerId != -1) {
+          log.info("Released container {} was assigned task group ID {}. 
Requesting a refactor container for the task group.", containerIdStr, 
containerId);
+
+          state.neededResources.incrementAndGet();
+          state.jobHealthy.set(false);
+
+          // request a container on refactor host
+          containerAllocator.requestResource(containerId, 
ResourceRequestState.ANY_HOST);
+        }
+        break;
+
+      default:
+        // TODO: Handle failure more intelligently. Should track NodeFailures!
+        log.info("Container failed for some reason. Let's start it again");
+        log.info("Container " + containerIdStr + " failed with exit code . " + 
exitStatus + " - " + containerStatus.getDiagnostics() + " containerID is " + 
containerId);
+
+        state.failedContainers.incrementAndGet();
+        state.failedContainersStatus.put(containerIdStr, containerStatus);
+        state.jobHealthy.set(false);
+
+        if (containerId != -1) {
+          state.neededResources.incrementAndGet();
+          // Find out previously running container location
+          String lastSeenOn = 
state.jobModelManager.jobModel().getContainerToHostValue(containerId, 
SetContainerHostMapping.HOST_KEY);
+          if (!hostAffinityEnabled || lastSeenOn == null) {
+            lastSeenOn = ResourceRequestState.ANY_HOST;
+          }
+          log.info("Container was last seen on " + lastSeenOn);
+          // A container failed for an unknown reason. Let's check to see if
+          // we need to shutdown the whole app master if too many container
+          // failures have happened. The rules for failing are that the
+          // failure count for a task group id must be > the configured retry
+          // count, and the last failure (the one prior to this one) must have
+          // happened less than retry window ms ago. If retry count is set to
+          // 0, the app master will fail on any container failure. If the
+          // retry count is set to a number < 0, a container failure will
+          // never trigger an app master failure.
+          int retryCount = clusterManagerConfig.getContainerRetryCount();
+          int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
+
+          if (retryCount == 0) {
+            log.error("Container ID {} ({}) failed, and retry count is set to 
0, so shutting down the application master, and marking the job as failed.", 
containerId, containerIdStr);
+
+            tooManyFailedContainers = true;
+          } else if (retryCount > 0) {
+            int currentFailCount;
+            long lastFailureTime;
+            if (containerFailures.containsKey(containerId)) {
+              ResourceFailure failure = containerFailures.get(containerId);
+              currentFailCount = failure.getCount() + 1;
+              lastFailureTime = failure.getLastFailure();
+            } else {
+              currentFailCount = 1;
+              lastFailureTime = 0L;
+            }
+            if (currentFailCount >= retryCount) {
+              long lastFailureMsDiff = System.currentTimeMillis() - 
lastFailureTime;
+
+              if (lastFailureMsDiff < retryWindowMs) {
+                log.error("Container ID " + containerId + "(" + containerIdStr 
+ ") has failed " + currentFailCount +
+                        " times, with last failure " + lastFailureMsDiff + "ms 
ago. This is greater than retry count of " +
+                        retryCount + " and window of " + retryWindowMs + "ms , 
so shutting down the application master, and marking the job as failed.");
+
+                // We have too many failures, and we're within the window
+                // boundary, so reset shut down the app master.
+                tooManyFailedContainers = true;
+                state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+              } else {
+                log.info("Resetting fail count for container ID {} back to 1, 
since last container failure ({}) for " +
+                        "this container ID was outside the bounds of the retry 
window.", containerId, containerIdStr);
+
+                // Reset counter back to 1, since the last failure for this
+                // container happened outside the window boundary.
+                containerFailures.put(containerId, new ResourceFailure(1, 
System.currentTimeMillis()));
+              }
+            } else {
+              log.info("Current fail count for container ID {} is {}.", 
containerId, currentFailCount);
+              containerFailures.put(containerId, new 
ResourceFailure(currentFailCount, System.currentTimeMillis()));
+            }
+          }
+
+          if (!tooManyFailedContainers) {
+            log.info("Requesting a refactor container ");
+            // Request a refactor container
+            containerAllocator.requestResource(containerId, lastSeenOn);
+          }
+        }
+
+    }
+  }
+
+  @Override
+  public void onResourcesAvailable(List<SamzaResource> resources) {
+    for (SamzaResource resource : resources) {
+      onResourceAllocated(resource);
+    }
+
+  }
+
+  @Override
+  public void onResourcesCompleted(List<SamzaResourceStatus> resourceStatuses) 
{
+    for (SamzaResourceStatus resourceStatus : resourceStatuses) {
+      onResourceCompleted(resourceStatus);
+    }
+  }
+
+  /**
+   * An error in the callback terminates the JobCoordinator
+   * @param e the underlying exception/error
+   */
+  @Override
+  public void onError(Throwable e) {
+    log.error("Exception occured in callbacks in the Container Manager : {}", 
e);
+    exceptionOccurred = e;
+  }
+
+  /**
+   * Returns an instantiated {@link ResourceManagerFactory} from a {@link 
ClusterManagerConfig}. The
+   * {@link ResourceManagerFactory} is used to return an implementation of a 
{@link ClusterResourceManager}
+   *
+   * @param clusterManagerConfig, the cluster manager config to parse.
+   *
+   */
+  private ResourceManagerFactory getContainerProcessManagerFactory(final 
ClusterManagerConfig clusterManagerConfig) {
+    final String containerManagerFactoryClass = 
clusterManagerConfig.getContainerManagerClass();
+    final ResourceManagerFactory factory;
+
+    try {
+      factory = (ResourceManagerFactory) 
Class.forName(containerManagerFactoryClass).newInstance();
+    } catch (InstantiationException e) {
+      log.error("Instantiation exception when creating ContainerManager", e);
+      throw new SamzaException(e);
+    } catch (IllegalAccessException e) {
+      log.error("Illegal access exception when creating ContainerManager", e);
+      throw new SamzaException(e);
+    } catch (ClassNotFoundException e) {
+      log.error("ClassNotFound Exception when creating ContainerManager", e);
+      throw new SamzaException(e);
+    }
+    return factory;
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
new file mode 100644
index 0000000..da73049
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the allocator thread that will be used by ContainerProcessManager 
when host-affinity is enabled for a job. It is similar
+ * to {@link ContainerAllocator}, except that it considers locality for 
allocation.
+ *
+ * In case of host-affinity, each request ({@link SamzaResourceRequest} 
encapsulates the identifier of the container
+ * to be run and a "preferredHost". preferredHost is determined by the 
locality mappings in the coordinator stream.
+ * This thread periodically wakes up and makes the best-effort to assign a 
container to the preferredHost. If the
+ * preferredHost is not returned by the cluster manager before the 
corresponding container expires, the thread
+ * assigns the container to any other host that is allocated next.
+ *
+ * The container expiry is determined by CONTAINER_REQUEST_TIMEOUT and is 
configurable on a per-job basis.
+ *
+ * If there aren't enough containers, it waits by sleeping for 
allocatorSleepIntervalMs milliseconds.
+ */
+//This class is used in the refactored code path as called by run-jc.sh
+
+public class HostAwareContainerAllocator extends AbstractContainerAllocator {
+  private static final Logger log = 
LoggerFactory.getLogger(HostAwareContainerAllocator.class);
+  /**
+   * Tracks the expiration of a request for resources.
+   */
+  private final int requestTimeout;
+
+  public HostAwareContainerAllocator(ClusterResourceManager manager ,
+                                     int timeout, Config config, 
SamzaApplicationState state) {
+    super(manager, new ResourceRequestState(true, manager), config, state);
+    this.requestTimeout = timeout;
+  }
+
+  /**
+   * Since host-affinity is enabled, all allocated resources are buffered in 
the list keyed by "preferredHost".
+   *
+   * If the requested host is not available, the thread checks to see if the 
request has expired.
+   * If it has expired, it runs the container with expectedContainerID on one 
of the available resources from the
+   * allocatedContainers buffer keyed by "ANY_HOST".
+   */
+  @Override
+  public void assignResourceRequests()  {
+    while (hasPendingRequest()) {
+      SamzaResourceRequest request = peekPendingRequest();
+      log.info("Handling request: " + request.getContainerID() + " " + 
request.getRequestTimestampMs() + " " + request.getPreferredHost());
+      String preferredHost = request.getPreferredHost();
+      int containerID = request.getContainerID();
+
+      if (hasAllocatedResource(preferredHost)) {
+        // Found allocated container at preferredHost
+        log.info("Found a matched-container {} on the preferred host. Running 
on {}", containerID, preferredHost);
+        runStreamProcessor(request, preferredHost);
+        state.matchedResourceRequests.incrementAndGet();
+      } else {
+        log.info("Did not find any allocated resources on preferred host {} 
for running container id {}",
+            preferredHost, containerID);
+
+        boolean expired = requestExpired(request);
+        boolean resourceAvailableOnAnyHost = 
hasAllocatedResource(ResourceRequestState.ANY_HOST);
+
+        if (expired && resourceAvailableOnAnyHost) {
+          log.info("Request expired. running on ANY_HOST");
+          runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+        } else {
+          log.info("Either the request timestamp {} is greater than resource 
request timeout {}ms or we couldn't "
+                  + "find any free allocated resources in the buffer. Breaking 
out of loop.",
+              request.getRequestTimestampMs(), requestTimeout);
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Checks if a request has expired.
+   * @param request
+   * @return
+   */
+  private boolean requestExpired(SamzaResourceRequest request) {
+    long currTime = System.currentTimeMillis();
+    boolean requestExpired =  currTime - request.getRequestTimestampMs() > 
requestTimeout;
+    if (requestExpired) {
+      log.info("Request {} with currTime {} has expired", request, currTime);
+    }
+    return requestExpired;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java
new file mode 100644
index 0000000..36e4b6f
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+/**
+ * A ResourceFailure instance encapsulates information related to a resource 
failure.
+ * It keeps track of the time of the last failure, the number of failures.
+ * */
+public class ResourceFailure {
+  /**
+   * Number of times a container has failed
+   * */
+  private final int count;
+  /**
+   * Latest failure time of the container
+   * */
+  private final Long lastFailure;
+
+  public ResourceFailure(int count,
+                         Long lastFailure) {
+    this.count = count;
+    this.lastFailure = lastFailure;
+  }
+
+  public int getCount() {
+    return count;
+  }
+
+  public Long getLastFailure() {
+    return lastFailure;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceManagerFactory.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceManagerFactory.java
new file mode 100644
index 0000000..fc5636d
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceManagerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+
+/**
+ * A factory to build a {@link ClusterResourceManager}
+ * //TODO: move the class to Samza-API?
+ */
+public interface ResourceManagerFactory {
+  /**
+   * Return a {@link ClusterResourceManager }
+   * @param callback to be registered with the {@link ClusterResourceManager}
+   * @param state Useful if the ClusterResourceManager wants to host an UI.
+   * //TODO: Remove the SamzaAppState param and refactor into a smaller 
focussed class.
+   * //TODO: Investigate the possibility a common Samza UI for all cluster 
managers - Yarn,Mesos,Standalone
+   * @return the instantiated {@link ClusterResourceManager}
+   */
+  public ClusterResourceManager 
getClusterResourceManager(ClusterResourceManager.Callback callback, 
SamzaApplicationState state);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
new file mode 100644
index 0000000..39897c7
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * {@link ResourceRequestState} maintains the state variables for all the 
resource requests and the allocated resources returned
+ * by the cluster manager.
+ *
+ * This class is thread-safe, and can safely support concurrent accesses 
without any form of external synchronization. Currently,
+ * this state is shared across both the Allocator Thread, and the Callback 
handler thread.
+ *
+ */
+public class ResourceRequestState {
+  private static final Logger log = 
LoggerFactory.getLogger(ResourceRequestState.class);
+  public static final String ANY_HOST = "ANY_HOST";
+
+  /**
+   * Maintain a map of hostname to a list of resources allocated on this host
+   */
+  private final Map<String, List<SamzaResource>> allocatedResources = new 
HashMap<>();
+  /**
+   * Represents the queue of resource requests made by the {@link 
ContainerProcessManager}
+   */
+  private final PriorityQueue<SamzaResourceRequest> requestsQueue = new 
PriorityQueue<SamzaResourceRequest>();
+  /**
+   * Maintain a map of hostname to the number of requests made for resources 
on this host
+   * This state variable is used to look-up whether an allocated resource on a 
host was ever requested in the past.
+   * This map is not updated when host-affinity is not enabled
+   */
+  private final Map<String, AtomicInteger> requestsToCountMap = new 
HashMap<>();
+  /**
+   * Indicates whether host-affinity is enabled or not
+   */
+  private final boolean hostAffinityEnabled;
+
+  private final ClusterResourceManager manager;
+
+  private final Object lock = new Object();
+
+  public ResourceRequestState(boolean hostAffinityEnabled, 
ClusterResourceManager manager) {
+    this.hostAffinityEnabled = hostAffinityEnabled;
+    this.manager = manager;
+  }
+
+  /**
+   * Enqueues a {@link SamzaResourceRequest} to be sent to a {@link 
ClusterResourceManager}.
+   *
+   * @param request {@link SamzaResourceRequest} to be queued
+   */
+  public void addResourceRequest(SamzaResourceRequest request) {
+    synchronized (lock) {
+      requestsQueue.add(request);
+      String preferredHost = request.getPreferredHost();
+
+      // if host affinity is enabled, update state.
+      if (hostAffinityEnabled) {
+        //increment # of requests on the host.
+        if (requestsToCountMap.containsKey(preferredHost)) {
+          requestsToCountMap.get(preferredHost).incrementAndGet();
+        } else {
+          requestsToCountMap.put(preferredHost, new AtomicInteger(1));
+        }
+        /**
+         * The following is important to correlate allocated resource data 
with the requestsQueue made before. If
+         * the preferredHost is requested for the first time, the state should 
reflect that the allocatedResources
+         * list is empty and NOT null.
+         */
+
+        if (!allocatedResources.containsKey(preferredHost)) {
+          allocatedResources.put(preferredHost, new 
ArrayList<SamzaResource>());
+        }
+      }
+      manager.requestResources(request);
+    }
+  }
+
+  /**
+   * Invoked each time a resource is returned from a {@link 
ClusterResourceManager}.
+   * @param samzaResource The resource that was returned from the {@link 
ClusterResourceManager}
+   */
+  public void addResource(SamzaResource samzaResource) {
+    synchronized (lock) {
+      if (hostAffinityEnabled) {
+        String hostName = samzaResource.getHost();
+        AtomicInteger requestCount = requestsToCountMap.get(hostName);
+        // Check if this host was requested for any of the resources
+        if (requestCount == null || requestCount.get() == 0) {
+          log.info(
+              " This host was not requested. {} saving the samzaResource {} in 
the buffer for ANY_HOST",
+              hostName,
+              samzaResource.getResourceID()
+          );
+          addToAllocatedResourceList(ANY_HOST, samzaResource);
+        } else {
+          // This host was indeed requested.
+          int requestCountOnThisHost = requestCount.get();
+          List<SamzaResource> allocatedResourcesOnThisHost = 
allocatedResources.get(hostName);
+          if (requestCountOnThisHost > 0) {
+            //there are pending requests for resources on this host.
+            if (allocatedResourcesOnThisHost == null || 
allocatedResourcesOnThisHost.size() < requestCountOnThisHost) {
+              log.info("Got matched samzaResource {} in the buffer for 
preferredHost: {}", samzaResource.getResourceID(), hostName);
+              addToAllocatedResourceList(hostName, samzaResource);
+            } else {
+              /**
+               * The RM may allocate more containers on a given host than 
requested. In such a case, even though the
+               * requestCount != 0, it will be greater than the total request 
count for that host. Hence, it should be
+               * assigned to ANY_HOST
+               */
+              log.info(
+                  "The number of containers already allocated on {} is greater 
than what was " +
+                      "requested, which is {}. Hence, saving the samzaResource 
{} in the buffer for ANY_HOST",
+                  new Object[]{
+                    hostName,
+                    requestCountOnThisHost,
+                    samzaResource.getResourceID()
+                  }
+              );
+              addToAllocatedResourceList(ANY_HOST, samzaResource);
+            }
+          }
+        }
+      } else {
+        log.info("Host affinity not enabled. Saving the samzaResource {} in 
the buffer for ANY_HOST", samzaResource.getResourceID());
+        addToAllocatedResourceList(ANY_HOST, samzaResource);
+      }
+    }
+  }
+
+  // Appends a samzaResource to the list of allocated resources
+  private void addToAllocatedResourceList(String host, SamzaResource 
samzaResource) {
+    List<SamzaResource> samzaResources = allocatedResources.get(host);
+    if (samzaResources != null) {
+      samzaResources.add(samzaResource);
+    } else {
+      samzaResources = new ArrayList<SamzaResource>();
+      samzaResources.add(samzaResource);
+      allocatedResources.put(host, samzaResources);
+    }
+  }
+
+  /**
+   * This method updates the state after a request is fulfilled and a resource 
starts running on a host
+   * Needs to be synchronized because the state buffers are populated by the 
AMRMCallbackHandler, whereas it is
+   * drained by the allocator thread
+   *
+   * @param request {@link SamzaResourceRequest} that was fulfilled
+   * @param assignedHost  Host to which the samzaResource was assigned
+   * @param samzaResource Allocated samzaResource resource that was used to 
satisfy this request
+   */
+  public void updateStateAfterAssignment(SamzaResourceRequest request, String 
assignedHost, SamzaResource samzaResource) {
+    synchronized (lock) {
+      requestsQueue.remove(request);
+      allocatedResources.get(assignedHost).remove(samzaResource);
+      if (hostAffinityEnabled) {
+        // assignedHost may not always be the preferred host.
+        // Hence, we should safely decrement the counter for the preferredHost
+        requestsToCountMap.get(request.getPreferredHost()).decrementAndGet();
+      }
+      // To avoid getting back excess resources
+      manager.cancelResourceRequest(request);
+    }
+  }
+
+  /**
+   * If requestQueue is empty, all extra resources in the buffer should be 
released and update the entire system's state
+   * Needs to be synchronized because it is modifying shared state buffers
+   * @return the number of resources released.
+   */
+  public int releaseExtraResources() {
+    synchronized (lock) {
+      int numReleasedResources = 0;
+      if (requestsQueue.isEmpty()) {
+        log.debug("Resource Requests Queue is empty.");
+        if (hostAffinityEnabled) {
+          List<String> allocatedHosts = getAllocatedHosts();
+          for (String host : allocatedHosts) {
+            numReleasedResources += releaseResourcesForHost(host);
+          }
+        } else {
+          numReleasedResources += releaseResourcesForHost(ANY_HOST);
+        }
+        clearState();
+      }
+      return numReleasedResources;
+    }
+  }
+
+  /**
+   * Releases a container that was allocated and assigned but could not be 
started.
+   * e.g. because of a ConnectException while trying to communicate with the 
NM.
+   * This method assumes the specified container and associated request have 
already
+   * been removed from their respective queues.
+   *
+   * @param resource the {@link SamzaResource} to release.
+   */
+  public void releaseUnstartableContainer(SamzaResource resource) {
+    log.info("Releasing unstartable container {}", resource.getResourceID());
+    manager.releaseResources(resource);
+  }
+
+
+  /**
+   * Releases all allocated resources for the specified host.
+   * @param host  the host for which the resources should be released.
+   * @return      the number of resources released.
+   */
+  private int releaseResourcesForHost(String host) {
+    int numReleasedResources = 0;
+    List<SamzaResource> samzaResources = allocatedResources.get(host);
+    if (samzaResources != null) {
+      for (SamzaResource resource : samzaResources) {
+        log.info("Releasing extra resource {} allocated on {}", 
resource.getResourceID(), host);
+        manager.releaseResources(resource);
+        numReleasedResources++;
+      }
+    }
+    return numReleasedResources;
+  }
+
+
+  /**
+   * Clears all the state variables
+   * Performed when there are no more unfulfilled requests
+   */
+  private void clearState() {
+    allocatedResources.clear();
+    requestsToCountMap.clear();
+    requestsQueue.clear();
+  }
+
+  /**
+   * Returns the list of hosts which has at least 1 allocated Resource in the 
buffer
+   * @return list of host names
+   */
+  private List<String> getAllocatedHosts() {
+    List<String> hostKeys = new ArrayList<String>();
+    for (Map.Entry<String, List<SamzaResource>> entry: 
allocatedResources.entrySet()) {
+      if (entry.getValue().size() > 0) {
+        hostKeys.add(entry.getKey());
+      }
+    }
+    return hostKeys;
+  }
+
+  /**
+   * Retrieves, but does not remove, the first allocated resource on the 
specified host.
+   *
+   * @param host  the host for which a resource is needed.
+   * @return      the first {@link SamzaResource} allocated for the specified 
host or {@code null} if there isn't one.
+   */
+
+  public SamzaResource peekResource(String host) {
+    synchronized (lock) {
+      List<SamzaResource> resourcesOnTheHost = 
this.allocatedResources.get(host);
+
+      if (resourcesOnTheHost == null || resourcesOnTheHost.isEmpty()) {
+        return null;
+      }
+      return resourcesOnTheHost.get(0);
+    }
+  }
+
+  /**
+   * Retrieves, but does not remove, the next pending request in the queue.
+   *
+   * @return  the pending request or {@code null} if there is no pending 
request.
+   */
+  public SamzaResourceRequest peekPendingRequest() {
+    synchronized (lock) {
+      return this.requestsQueue.peek();
+    }
+  }
+
+  /**
+   * Returns the number of pending SamzaResource requests in the queue.
+   * @return the number of pending requests
+   */
+  public int numPendingRequests() {
+    synchronized (lock) {
+      return this.requestsQueue.size();
+    }
+  }
+
+
+  /**
+   * Returns the list of resources allocated on a given host. If no resources 
were ever allocated on
+   * the given host, it returns null. This method makes a defensive shallow 
copy. A shallow copy is
+   * sufficient because the SamzaResource class does not expose setters.
+   *
+   * @param host hostname
+   * @return list of resources allocated on the given host, or null
+   */
+  public List<SamzaResource> getResourcesOnAHost(String host) {
+    synchronized (lock) {
+      List<SamzaResource> samzaResourceList = allocatedResources.get(host);
+      if (samzaResourceList == null)
+        return null;
+
+      return new ArrayList<SamzaResource>(samzaResourceList);
+    }
+  }
+
+  //Package private, used only in tests.
+  Map<String, AtomicInteger> getRequestsToCountMap() {
+    return Collections.unmodifiableMap(requestsToCountMap);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
new file mode 100644
index 0000000..ca277b3
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.coordinator.JobModelManager;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * SamzaAppState encapsulates state like - completedContainers, 
runningContainers. This
+ * class is also used to display information in the Samza UI. Changing any 
variable name/
+ * data structure type in this class WILL break the UI.
+ *
+ * TODO:
+ * 1.Make these variables private, final
+ * 2.Provide thread-safe accessors.
+ * //Since the scope of that change is larger, I'm tracking it to work later 
as a part of SAMZA-902
+ *
+ */
+
+public class SamzaApplicationState {
+
+  public enum SamzaAppStatus { UNDEFINED, SUCCEEDED, FAILED }
+
+  public final JobModelManager jobModelManager;
+
+  /**
+   * JMX Server URL, if enabled
+   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
+   */
+  public String jmxUrl = "";
+  /**
+   * JMX Server Tunneling URL, if enabled
+   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
+   */
+  public String jmxTunnelingUrl = "";
+  /**
+   * The following state variables are required for the correct functioning of 
the TaskManager
+   * Some of them are shared between the AMRMCallbackThread and the 
ContainerAllocator thread, as mentioned below.
+   */
+
+  /**
+   * Number of containers that have completed their execution and exited 
successfully
+   */
+  public final AtomicInteger completedContainers = new AtomicInteger(0);
+
+  /**
+   * Number of failed containers
+   * */
+  public final AtomicInteger failedContainers = new AtomicInteger(0);
+
+  /**
+   * Number of containers released due to extra allocation returned by the RM
+   */
+  public final AtomicInteger releasedContainers = new AtomicInteger(0);
+
+  /**
+   * ContainerStatus of failed containers.
+   */
+  public final ConcurrentMap<String, SamzaResourceStatus> 
failedContainersStatus = new ConcurrentHashMap<String, SamzaResourceStatus>();
+
+  /**
+   * Number of containers configured for the job
+   */
+  public final AtomicInteger containerCount = new AtomicInteger(0);
+
+  /**
+   * Set of finished containers - TODO: Can be changed to a counter
+   */
+  public final Set<Integer> finishedContainers = new HashSet<Integer>();
+
+  /**
+   *  Number of containers needed for the job to be declared healthy
+   *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
+   */
+  public final AtomicInteger neededResources = new AtomicInteger(0);
+
+  /**
+   *  Map of the samzaContainerId to the {@link SamzaResource} on which it is 
running
+   *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
+   */
+  public final ConcurrentMap<Integer, SamzaResource> runningContainers = new 
ConcurrentHashMap<Integer, SamzaResource>(0);
+
+  /**
+   * Final status of the application
+   */
+  public SamzaAppStatus status = SamzaAppStatus.UNDEFINED;
+
+  /**
+   * State indicating whether the job is healthy or not
+   * Modified by both the callback handler and the ContainerAllocator thread
+   */
+  public final AtomicBoolean jobHealthy = new AtomicBoolean(true);
+
+  public final AtomicInteger containerRequests = new AtomicInteger(0);
+
+  public final AtomicInteger matchedResourceRequests = new AtomicInteger(0);
+
+  public SamzaApplicationState(JobModelManager jobModelManager) {
+    this.jobModelManager = jobModelManager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java
new file mode 100644
index 0000000..262c60c
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+/**
+ * {@code SamzaContainerLaunchException} indicates an {@link Exception} during 
container launch.
+ * It can wrap another type of {@link Throwable} or {@link Exception}. 
Ultimately, any exception thrown
+ * during container launch should be of this type so it can be handled 
explicitly.
+ */
+public class SamzaContainerLaunchException extends Exception {
+  private static final long serialVersionUID = -3957939806997013992L;
+
+  public SamzaContainerLaunchException() {
+    super();
+  }
+
+  public SamzaContainerLaunchException(String s, Throwable t) {
+    super(s, t);
+  }
+
+  public SamzaContainerLaunchException(String s) {
+    super(s);
+  }
+
+  public SamzaContainerLaunchException(Throwable t) {
+    super(t);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
new file mode 100644
index 0000000..ba6ca2c
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+/**
+ * Specification of a Samza Resource. A resource is identified by a unique 
resource ID.
+ * A resource is currently comprised of CPUs and Memory resources on a host.
+ *
+ */
+public class SamzaResource {
+  private final int numCores;
+  private final int memoryMb;
+  private final String host;
+  private final String resourceID;
+
+  //TODO: Investigate adding disk space. Mesos supports disk based 
reservations.
+
+  public SamzaResource(int numCores, int memoryMb, String host, String 
resourceID) {
+    this.numCores = numCores;
+    this.memoryMb = memoryMb;
+    this.host = host;
+    this.resourceID = resourceID;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    SamzaResource resource = (SamzaResource) o;
+
+    if (numCores != resource.numCores) return false;
+    if (memoryMb != resource.memoryMb) return false;
+    return resourceID.equals(resource.resourceID);
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = numCores;
+    result = 31 * result + memoryMb;
+    result = 31 * result + resourceID.hashCode();
+    return result;
+  }
+
+  public int getNumCores() {
+    return numCores;
+  }
+
+  public int getMemoryMb() {
+    return memoryMb;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public String getResourceID() {
+    return resourceID;
+  }
+}

Reply via email to