http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 7778a38..1fac7f4 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.job.yarn;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -40,6 +42,7 @@ import org.apache.samza.util.hadoop.HttpFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -90,6 +93,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
    */
   private final SamzaYarnAppMasterService service;
 
+  private final YarnConfig yarnConfig;
 
   /**
    * State variables to map Yarn specific callbacks into Samza specific 
callbacks.
@@ -126,18 +130,19 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     int nodePort = Integer.parseInt(nodePortString);
     int nodeHttpPort = Integer.parseInt(nodeHttpPortString);
     YarnConfig yarnConfig = new YarnConfig(config);
+    this.yarnConfig = yarnConfig;
     int interval = yarnConfig.getAMPollIntervalMs();
 
     //Instantiate the AM Client.
     this.amClient = AMRMClientAsync.createAMRMClientAsync(interval, this);
 
-    this.state = new YarnAppState(jobModelManager, -1, containerId, 
nodeHostString, nodePort, nodeHttpPort, samzaAppState);
+    this.state = new YarnAppState(-1, containerId, nodeHostString, nodePort, 
nodeHttpPort);
 
     log.info("Initialized YarnAppState: {}", state.toString());
-    this.service = new SamzaYarnAppMasterService(config, this.state, registry);
+    this.service = new SamzaYarnAppMasterService(config, samzaAppState, 
this.state, registry, hConfig);
 
     log.info("ContainerID str {}, Nodehost  {} , Nodeport  {} , NodeHttpport 
{}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort});
-    this.lifecycle = new 
SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), 
yarnConfig.getContainerMaxCpuCores(), state, amClient );
+    this.lifecycle = new 
SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), 
yarnConfig.getContainerMaxCpuCores(), samzaAppState, state, amClient );
 
     yarnContainerRunner = new YarnContainerRunner(config, hConfig);
   }
@@ -312,6 +317,33 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     amClient.stop();
     log.info("Stopping the AM service " );
     service.onShutdown();
+
+    if(status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
+      cleanupStagingDir();
+    }
+  }
+
+  /**
+   * Cleans up the staging directory of the job. All exceptions during the 
cleanup
+   * are swallowed.
+   */
+  private void cleanupStagingDir() {
+    String yarnJobStagingDirectory = yarnConfig.getYarnJobStagingDirectory();
+    if(yarnJobStagingDirectory != null) {
+      JobContext context = new JobContext();
+      context.setAppStagingDir(new Path(yarnJobStagingDirectory));
+
+      FileSystem fs = null;
+      try {
+        fs = FileSystem.get(hConfig);
+      } catch (IOException e) {
+        log.error("Unable to clean up file system: {}", e);
+        return;
+      }
+      if(fs != null) {
+        YarnJobUtil.cleanupStagingDir(context, fs);
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml 
b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index 93660c7..93176ff 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -15,7 +15,8 @@
   KIND, either express or implied.  See the License for the
   specific language governing permissions and limitations
   under the License.
--@ val state: org.apache.samza.job.yarn.SamzaAppState
+-@ val state: org.apache.samza.job.yarn.YarnAppState
+-@ val samzaAppState: org.apache.samza.clustermanager.SamzaApplicationState
 -@ val config: scala.collection.immutable.TreeMap[String, String]
 -@ val rmHttpAddress: String
 -@ val jobName: String = config.get("job.name").getOrElse("MISSING JOB NAME")
@@ -91,10 +92,10 @@
               %a(target="_blank" 
href="http://#{state.nodeHost}:#{state.nodeHttpPort.toString}/node/containerlogs/#{state.amContainerId.toString}/#{username}";)=
 state.amContainerId.toString
           %tr
             %td.key JMX server url
-            %td= state.jmxUrl
+            %td= samzaAppState.jmxUrl
           %tr
             %td.key JMX server tunneling url
-            %td= state.jmxTunnelingUrl
+            %td= samzaAppState.jmxTunnelingUrl
 
     %div.tab-pane#containers
       %h2 Containers
@@ -102,16 +103,16 @@
         %tr
           %tr
             %td.key Completed
-            %td= state.completedContainers.toString
+            %td= samzaAppState.completedContainers.toString
           %tr
             %td.key Needed
-            %td= state.neededContainers.toString
+            %td= samzaAppState.neededContainers.toString
           %tr
             %td.key Failed
-            %td= state.failedContainers.toString
+            %td= samzaAppState.failedContainers.toString
           %tr
             %td.key Released
-            %td= state.releasedContainers.toString
+            %td= samzaAppState.releasedContainers.toString
 
       %h2 Running Containers
       %table.table.table-striped.table-bordered.tablesorter#containers-table
@@ -124,7 +125,7 @@
             %th Up Time
             %th JMX access
         %tbody
-          - for((containerId, container) <- state.runningContainers)
+          - for((containerId, container) <- state.runningYarnContainers)
             %tr
               %td #{containerId.toString}
               %td
@@ -136,8 +137,8 @@
               %td
                 Up time: #{container.upTimeStr()}
               %td
-                Ordinary: 
#{state.jobCoordinator.jobModel.getContainerToHostValue(containerId, 
org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_URL_KEY)}
-                Tunneling: 
#{state.jobCoordinator.jobModel.getContainerToHostValue(containerId, 
org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_TUNNELING_URL_KEY)}
+                Ordinary: 
#{samzaAppState.jobModelManager.jobModel.getContainerToHostValue(containerId, 
org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_URL_KEY)}
+                Tunneling: 
#{samzaAppState.jobModelManager.jobModel.getContainerToHostValue(containerId, 
org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_TUNNELING_URL_KEY)}
 
       %h2 Failed Containers
       %table.table.table-striped.table-bordered.tablesorter#containers-table
@@ -162,10 +163,10 @@
         %tbody
           %tr
             %td.key Total
-            %td= state.containerCount.toString
+            %td= samzaAppState.containerCount.toString
           %tr
             %td.key Finished
-            %td= state.finishedContainers.size.toString
+            %td= samzaAppState.finishedContainers.toString
 
       %h3 TaskName Assignment
       %table.table.table-striped.table-bordered.tablesorter#taskids-table
@@ -176,8 +177,8 @@
             %th SystemStreamPartitions
             %th Container
         %tbody
-          - for((containerId, container) <- state.runningContainers)
-            - val containerModel = 
state.jobCoordinator.jobModel.getContainers.get(containerId)
+          - for((containerId, container) <- state.runningYarnContainers)
+            - val containerModel = 
samzaAppState.jobModelManager.jobModel.getContainers.get(containerId)
             - for((taskName, taskModel) <- containerModel.getTasks)
               %tr
                 %td= containerId

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
deleted file mode 100644
index 1fb18be..0000000
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import java.io.IOException
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, FileSystem}
-
-import scala.collection.JavaConversions.asScalaBuffer
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, Container, 
ContainerStatus, NodeReport}
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.Config
-import org.apache.samza.config.ShellCommandConfig
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.YarnConfig
-import org.apache.samza.metrics.JmxServer
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.util.hadoop.HttpFileSystem
-import org.apache.samza.util.Logging
-import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.SamzaException
-
-/**
- * When YARN executes an application master, it needs a bash command to
- * execute. For Samza, YARN will execute this main method when starting Samza's
- * YARN application master.
- *
- * <br/><br/>
- *
- * The main method gets all of the environment variables (passed by Samza's
- * YARN client, and YARN itself), and wires up everything to run Samza's
- * application master.
- */
-object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler {
-  val DEFAULT_POLL_INTERVAL_MS: Int = 1000
-  var listeners: List[YarnAppMasterListener] = null
-  var storedException: Throwable = null
-
-  def main(args: Array[String]) {
-    putMDC("containerName", "samza-application-master")
-    val containerIdStr = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString)
-    info("got container id: %s" format containerIdStr)
-    val containerId = ConverterUtils.toContainerId(containerIdStr)
-    val applicationAttemptId = containerId.getApplicationAttemptId
-    info("got app attempt id: %s" format applicationAttemptId)
-    val nodeHostString = 
System.getenv(ApplicationConstants.Environment.NM_HOST.toString)
-    info("got node manager host: %s" format nodeHostString)
-    val nodePortString = 
System.getenv(ApplicationConstants.Environment.NM_PORT.toString)
-    info("got node manager port: %s" format nodePortString)
-    val nodeHttpPortString = 
System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString)
-    info("got node manager http port: %s" format nodeHttpPortString)
-    val coordinatorSystemConfig = new 
MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG),
 classOf[Config]))
-    info("got coordinator system config: %s" format coordinatorSystemConfig)
-    val registry = new MetricsRegistryMap
-    val jobCoordinator = JobModelManager(coordinatorSystemConfig, registry)
-    val config = jobCoordinator.jobModel.getConfig
-    val yarnConfig = new YarnConfig(config)
-    info("got config: %s" format coordinatorSystemConfig)
-    putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can 
not find the job name")))
-    putMDC("jobId", config.getJobId.getOrElse("1"))
-    val hConfig = new YarnConfiguration
-    hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)
-    val interval = yarnConfig.getAMPollIntervalMs
-    val amClient = 
AMRMClientAsync.createAMRMClientAsync[ContainerRequest](interval, this)
-    val clientHelper = new ClientHelper(hConfig)
-    val containerMem = yarnConfig.getContainerMaxMemoryMb
-    val containerCpu = yarnConfig.getContainerMaxCpuCores
-    val jmxServer = if (yarnConfig.getJmxServerEnabled) Some(new JmxServer()) 
else None
-    val jobContext = new JobContext
-    Option(yarnConfig.getYarnJobStagingDirectory).map {
-      jobStagingDirectory => jobContext.setAppStagingDir(new 
Path(jobStagingDirectory))
-    }
-
-    // wire up all of the yarn event listeners
-    val state = new SamzaAppState(jobCoordinator, -1, containerId, 
nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
-    try {
-      if (jmxServer.isDefined) {
-        state.jmxUrl = jmxServer.get.getJmxUrl
-        state.jmxTunnelingUrl = jmxServer.get.getTunnelingJmxUrl
-      }
-
-      val service = new SamzaAppMasterService(config, state, registry, 
clientHelper, hConfig)
-      val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, 
state, amClient)
-      val metrics = new SamzaAppMasterMetrics(config, state, registry)
-      val taskManager = new SamzaTaskManager(config, state, amClient, hConfig)
-
-      listeners =  List(service, lifecycle, metrics, taskManager)
-      run(amClient, listeners, hConfig, interval)
-    } finally {
-      if (state.status != FinalApplicationStatus.UNDEFINED) {
-        YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(hConfig))
-      }
-
-      // jmxServer has to be stopped or will prevent process from exiting.
-      if (jmxServer.isDefined) {
-        jmxServer.get.stop
-      }
-    }
-  }
-
-  def run(amClient: AMRMClientAsync[ContainerRequest], listeners: 
List[YarnAppMasterListener], hConfig: YarnConfiguration, interval: Int): Unit = 
{
-    try {
-      amClient.init(hConfig)
-      amClient.start
-      listeners.foreach(_.onInit)
-      var isShutdown: Boolean = false
-      // have the loop to prevent the process from exiting until the job is to 
shutdown or error occurs on amClient
-      while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || 
_) && storedException == null) {
-        try {
-          Thread.sleep(interval)
-        } catch {
-          case e: InterruptedException => {
-            isShutdown = true
-            info("got interrupt in app master thread, so shutting down")
-          }
-        }
-      }
-    } finally {
-      // listeners has to be stopped
-      listeners.foreach(listener => try {
-        listener.onShutdown
-      } catch {
-        case e: Exception => warn("Listener %s failed to shutdown." format 
listener, e)
-      })
-
-      // amClient has to be stopped
-      amClient.stop
-    }
-  }
-
-  override def onContainersCompleted(statuses: 
java.util.List[ContainerStatus]): Unit =
-    statuses.foreach(containerStatus => 
listeners.foreach(_.onContainerCompleted(containerStatus)))
-
-  override def onContainersAllocated(containers: java.util.List[Container]): 
Unit =
-    containers.foreach(container => 
listeners.foreach(_.onContainerAllocated(container)))
-
-  override def onShutdownRequest: Unit = listeners.foreach(_.onReboot)
-
-  override def onNodesUpdated(updatedNodes: java.util.List[NodeReport]): Unit 
= Unit
-
-  // TODO need to think about meaningful SAMZA's progress
-  override def getProgress: Float = 0.0F
-
-  override def onError(e: Throwable): Unit = {
-    error("Error occured in amClient's callback", e)
-    storedException = e
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
deleted file mode 100644
index 2a5c0d8..0000000
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
-import org.apache.samza.SamzaException
-import org.apache.samza.util.Logging
-
-/**
- * Responsible for managing the lifecycle of the application master. Mostly,
- * this means registering and unregistering with the RM, and shutting down
- * when the RM tells us to Reboot.
- */
-class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: 
SamzaAppState, amClient: AMRMClientAsync[ContainerRequest]) extends 
YarnAppMasterListener with Logging {
-  var validResourceRequest = true
-  var shutdownMessage: String = null
-
-  override def onInit() {
-    val host = state.nodeHost
-    val response = amClient.registerApplicationMaster(host, 
state.rpcUrl.getPort, "%s:%d" format (host, state.trackingUrl.getPort))
-
-    // validate that the YARN cluster can handle our container resource 
requirements
-    val maxCapability = response.getMaximumResourceCapability
-    val maxMem = maxCapability.getMemory
-    val maxCpu = maxCapability.getVirtualCores
-
-    info("Got AM register response. The YARN RM supports container requests 
with max-mem: %s, max-cpu: %s" format (maxMem, maxCpu))
-
-    if (containerMem > maxMem || containerCpu > maxCpu) {
-      shutdownMessage = "The YARN cluster is unable to run your job due to 
unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." 
format (containerMem, containerCpu)
-      error(shutdownMessage)
-      validResourceRequest = false
-      state.status = FinalApplicationStatus.FAILED
-      state.jobHealthy.set(false)
-    }
-  }
-
-  override def onReboot() {
-    throw new SamzaException("Received a reboot signal from the RM, so 
throwing an exception to reboot the AM.")
-  }
-
-  override def onShutdown() {
-    info("Shutting down.")
-    amClient.unregisterApplicationMaster(state.status, shutdownMessage, null)
-  }
-
-  override def shouldShutdown = !validResourceRequest
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
deleted file mode 100644
index 054d8b6..0000000
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.JvmMetrics
-import org.apache.samza.config.Config
-import org.apache.samza.task.TaskContext
-import org.apache.samza.Partition
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.util.Util
-import org.apache.samza.metrics.ReadableMetricsRegistry
-import org.apache.samza.util.Logging
-import org.apache.samza.SamzaException
-import java.util.Timer
-import java.util.TimerTask
-import org.apache.samza.metrics.MetricsHelper
-
-object SamzaAppMasterMetrics {
-  val sourceName = "ApplicationMaster"
-}
-
-/**
- * Responsible for wiring up Samza's metrics. Given that Samza has a metric
- * registry, we might as well use it. This class takes Samza's application
- * master state, and converts it to metrics.
- */
-class SamzaAppMasterMetrics(
-  val config: Config,
-  val state: SamzaAppState,
-  val registry: ReadableMetricsRegistry) extends MetricsHelper with 
YarnAppMasterListener with Logging {
-
-  val jvm = new JvmMetrics(registry)
-  val reporters = config.getMetricReporterNames.map(reporterName => {
-    val metricsFactoryClassName = config
-      .getMetricsFactoryClass(reporterName)
-      .getOrElse(throw new SamzaException("Metrics reporter %s missing .class 
config" format reporterName))
-
-    val reporter =
-      Util
-        .getObj[MetricsReporterFactory](metricsFactoryClassName)
-        .getMetricsReporter(reporterName, SamzaAppMasterMetrics.sourceName, 
config)
-
-    reporter.register(SamzaAppMasterMetrics.sourceName, registry)
-    (reporterName, reporter)
-  }).toMap
-
-  override def onInit() {
-    val mRunningContainers = newGauge("running-containers", () => 
state.runningContainers.size)
-    val mNeededContainers = newGauge("needed-containers", () => 
state.neededContainers.get())
-    val mCompletedContainers = newGauge("completed-containers", () => 
state.completedContainers.get())
-    val mFailedContainers = newGauge("failed-containers", () => 
state.failedContainers.get())
-    val mReleasedContainers = newGauge("released-containers", () => 
state.releasedContainers.get())
-    val mContainers = newGauge("container-count", () => state.containerCount)
-    val mHost = newGauge("http-host", () => state.nodeHost)
-    val mTrackingPort = newGauge("http-port", () => state.trackingUrl.getPort)
-    val mRpcPort = newGauge("rpc-port", () => state.rpcUrl.getPort)
-    val mAppAttemptId = newGauge("app-attempt-id", () => 
state.appAttemptId.toString)
-    val mJobHealthy = newGauge("job-healthy", () => if 
(state.jobHealthy.get()) 1 else 0)
-    val mLocalityMatchedRequests = newGauge(
-      "locality-matched",
-      () => {
-        if (state.containerRequests.get() != 0) {
-          state.matchedContainerRequests.get() / state.containerRequests.get()
-        } else {
-          0L
-        }
-      })
-
-    jvm.start
-    reporters.values.foreach(_.start)
-  }
-
-  override def onShutdown() {
-    reporters.values.foreach(_.stop)
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
deleted file mode 100644
index 979d81d..0000000
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
-import org.apache.samza.coordinator.stream.messages.SetConfig
-import org.apache.samza.util.Logging
-import org.apache.samza.config.Config
-import org.apache.samza.metrics.ReadableMetricsRegistry
-import org.apache.samza.SamzaException
-import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.webapp.ApplicationMasterRestServlet
-import org.apache.samza.webapp.ApplicationMasterWebServlet
-
-/**
-  * Samza's application master runs a very basic HTTP/JSON service to allow
-  * dashboards to check on the status of a job. SamzaAppMasterService starts
-  * up the web service when initialized.
-  * <p />
-  * Besides the HTTP/JSON service endpoints, it also starts an optional
-  * SecurityManager which takes care of the security needs when running in
-  * a secure environment.
-  */
-class SamzaAppMasterService(config: Config, state: SamzaAppState, registry: 
ReadableMetricsRegistry, clientHelper: ClientHelper, yarnConfiguration: 
YarnConfiguration) extends YarnAppMasterListener with Logging {
-  var rpcApp: HttpServer = null
-  var webApp: HttpServer = null
-  val SERVER_URL_OPT: String = "samza.autoscaling.server.url"
-  var securityManager: Option[SamzaAppMasterSecurityManager] = None
-
-  override def onInit() {
-    // try starting the samza AM dashboard at a random rpc and tracking port
-    info("Starting webapp at a random rpc and tracking port")
-
-    rpcApp = new HttpServer(resourceBasePath = "scalate")
-    rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, state, 
registry))
-    rpcApp.start
-
-    webApp = new HttpServer(resourceBasePath = "scalate")
-    webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
-    webApp.start
-
-    state.jobCoordinator.start
-    state.rpcUrl = rpcApp.getUrl
-    state.trackingUrl = webApp.getUrl
-    state.coordinatorUrl = state.jobCoordinator.server.getUrl
-
-    //write server url to coordinator stream
-    val coordinatorStreamWriter: CoordinatorStreamWriter = new 
CoordinatorStreamWriter(config)
-    coordinatorStreamWriter.start()
-    coordinatorStreamWriter.sendMessage(SetConfig.TYPE, SERVER_URL_OPT, 
state.coordinatorUrl.toString)
-    coordinatorStreamWriter.stop()
-    debug("sent server url message with value: %s " format 
state.coordinatorUrl.toString)
-
-    info("Webapp is started at (rpc %s, tracking %s, coordinator %s)" 
format(state.rpcUrl, state.trackingUrl, state.coordinatorUrl))
-
-    // start YarnSecurityManger for a secure cluster
-    if (UserGroupInformation.isSecurityEnabled) {
-      securityManager = Option {
-        val securityManager = new SamzaAppMasterSecurityManager(config, 
yarnConfiguration)
-        securityManager.start
-        securityManager
-      }
-    }
-  }
-
-  override def onShutdown() {
-    if (rpcApp != null) {
-      rpcApp.stop
-    }
-
-    if (webApp != null) {
-      webApp.stop
-    }
-
-    state.jobCoordinator.stop
-
-    securityManager.map {
-      securityManager => securityManager.stop
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
index 2ed9baf..c9c1e18 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
@@ -33,7 +33,7 @@ import org.apache.samza.util.Logging
  * when the RM tells us to Reboot.
  */
 //This class is used in the refactored code path as called by run-jc.sh
-class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: 
YarnAppState, amClient: AMRMClientAsync[ContainerRequest]) extends Logging {
+class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, 
samzaAppState: SamzaApplicationState, state: YarnAppState, amClient: 
AMRMClientAsync[ContainerRequest]) extends Logging {
   var validResourceRequest = true
   var shutdownMessage: String = null
   var webApp: SamzaYarnAppMasterService = null
@@ -51,8 +51,8 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, 
containerCpu: Int, state: Y
       shutdownMessage = "The YARN cluster is unable to run your job due to 
unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." 
format (containerMem, containerCpu)
       error(shutdownMessage)
       validResourceRequest = false
-      state.samzaAppState.status = SamzaAppStatus.FAILED;
-      state.samzaAppState.jobHealthy.set(false)
+      samzaAppState.status = SamzaAppStatus.FAILED;
+      samzaAppState.jobHealthy.set(false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
index f62bec1..5f2bfc5 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
@@ -19,42 +19,47 @@
 
 package org.apache.samza.job.yarn
 
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.samza.clustermanager.SamzaApplicationState
 import org.apache.samza.config.Config
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
 import org.apache.samza.coordinator.stream.messages.SetConfig
 import org.apache.samza.metrics.ReadableMetricsRegistry
 import org.apache.samza.util.Logging
+import org.apache.samza.webapp.{ApplicationMasterWebServlet, 
ApplicationMasterRestServlet}
 
 /**
- * Samza's application master runs a very basic HTTP/JSON service to allow
- * dashboards to check on the status of a job. SamzaAppMasterService starts
- * up the web service when initialized.
- */
+  * Samza's application master runs a very basic HTTP/JSON service to allow
+  * dashboards to check on the status of a job. SamzaAppMasterService starts
+  * up the web service when initialized.
+  */
 //This class is used in the refactored code path as called by run-jc.sh
 
-class SamzaYarnAppMasterService(config: Config, state: YarnAppState, registry: 
ReadableMetricsRegistry) extends  Logging {
+class SamzaYarnAppMasterService(config: Config, samzaAppState: 
SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry, 
yarnConfiguration: YarnConfiguration) extends  Logging {
   var rpcApp: HttpServer = null
   var webApp: HttpServer = null
   val SERVER_URL_OPT: String = "samza.autoscaling.server.url"
+  var securityManager: Option[SamzaAppMasterSecurityManager] = None
 
-   def onInit() {
+  def onInit() {
     // try starting the samza AM dashboard at a random rpc and tracking port
     info("Starting webapp at a random rpc and tracking port")
 
     rpcApp = new HttpServer(resourceBasePath = "scalate")
-    //TODO: Since the state has changed into Samza specific and Yarn specific 
states, this UI has to be refactored too.
-    //rpcApp.addServlet("/*", refactor ApplicationMasterRestServlet(config, 
state, registry))
+
+    rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, 
samzaAppState, state, registry))
     rpcApp.start
 
     webApp = new HttpServer(resourceBasePath = "scalate")
-    //webApp.addServlet("/*", refactor ApplicationMasterWebServlet(config, 
state))
+    webApp.addServlet("/*", new ApplicationMasterWebServlet(config, 
samzaAppState, state))
     webApp.start
 
-    state.jobModelManager.start
+    samzaAppState.jobModelManager.start
     state.rpcUrl = rpcApp.getUrl
     state.trackingUrl = webApp.getUrl
-    state.coordinatorUrl = state.jobModelManager.server.getUrl
+    state.coordinatorUrl = samzaAppState.jobModelManager.server.getUrl
 
     //write server url to coordinator stream
     val coordinatorStreamWriter: CoordinatorStreamWriter = new 
CoordinatorStreamWriter(config)
@@ -64,9 +69,19 @@ class SamzaYarnAppMasterService(config: Config, state: 
YarnAppState, registry: R
     debug("Sent server url message with value: %s " format 
state.coordinatorUrl.toString)
 
     info("Webapp is started at (rpc %s, tracking %s, coordinator %s)" 
format(state.rpcUrl, state.trackingUrl, state.coordinatorUrl))
+
+    // start YarnSecurityManger for a secure cluster
+    if (UserGroupInformation.isSecurityEnabled) {
+      securityManager = Option {
+        val securityManager = new SamzaAppMasterSecurityManager(config, 
yarnConfiguration)
+        securityManager.start
+        securityManager
+      }
+    }
+
   }
 
-   def onShutdown() {
+  def onShutdown() {
     if (rpcApp != null) {
       rpcApp.stop
     }
@@ -75,6 +90,11 @@ class SamzaYarnAppMasterService(config: Config, state: 
YarnAppState, registry: R
       webApp.stop
     }
 
-    state.jobModelManager.stop
+    samzaAppState.jobModelManager.stop
+
+    securityManager.map {
+      securityManager => securityManager.stop
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 3ca1f0d..46dc4d1 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -86,11 +86,11 @@ class YarnJob(config: Config, hadoopConfig: Configuration) 
extends StreamJob {
     }
     logger.info("Inside YarnJob: fwk_path is %s, ver is %s use it directly " 
format(fwkPath, fwkVersion))
 
-    var cmdExec = "./__package/bin/run-am.sh" // default location
+    var cmdExec = "./__package/bin/run-jc.sh" // default location
 
     if (!fwkPath.isEmpty()) {
       // if we have framework installed as a separate package - use it
-      cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-am.sh"
+      cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-jc.sh"
 
       logger.info("Using FWK path: " + "export SAMZA_LOG_DIR=%s && ln -sfn %s 
logs && exec %s 1>logs/%s 2>logs/%s".
              format(ApplicationConstants.LOG_DIR_EXPANSION_VAR, 
ApplicationConstants.LOG_DIR_EXPANSION_VAR, cmdExec,

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index a40ab72..cdd389c 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -19,17 +19,18 @@
 
 package org.apache.samza.webapp
 
+import org.apache.samza.clustermanager.SamzaApplicationState
 import org.scalatra._
 import scalate.ScalateSupport
 import org.apache.samza.config.Config
-import org.apache.samza.job.yarn.{SamzaAppState, ClientHelper}
+import org.apache.samza.job.yarn.{YarnAppState, ClientHelper}
 import org.apache.samza.metrics._
 import scala.collection.JavaConversions._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import java.util.HashMap
 import org.apache.samza.serializers.model.SamzaObjectMapper
 
-class ApplicationMasterRestServlet(config: Config, state: SamzaAppState, 
registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport {
+class ApplicationMasterRestServlet(config: Config, samzaAppState: 
SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry) 
extends ScalatraServlet with ScalateSupport {
   val yarnConfig = new YarnConfiguration
   val client = new ClientHelper(yarnConfig)
   val jsonMapper = SamzaObjectMapper.getObjectMapper
@@ -78,11 +79,11 @@ class ApplicationMasterRestServlet(config: Config, state: 
SamzaAppState, registr
   get("/am") {
     val containers = new HashMap[String, HashMap[String, Object]]
 
-    state.runningContainers.foreach {
+    state.runningYarnContainers.foreach {
       case (containerId, container) =>
         val yarnContainerId = container.id.toString
         val containerMap = new HashMap[String, Object]
-        val taskModels = 
state.jobCoordinator.jobModel.getContainers.get(containerId).getTasks
+        val taskModels = 
samzaAppState.jobModelManager.jobModel.getContainers.get(containerId).getTasks
         containerMap.put("yarn-address", container.nodeHttpAddress)
         containerMap.put("start-time", container.startTime.toString)
         containerMap.put("up-time", container.upTime.toString)

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
index 605332a..a32cd65 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
@@ -19,16 +19,17 @@
 
 package org.apache.samza.webapp
 
+import org.apache.samza.clustermanager.SamzaApplicationState
 import org.scalatra._
 import scalate.ScalateSupport
-import org.apache.samza.job.yarn.{SamzaAppState}
+import org.apache.samza.job.yarn.YarnAppState
 import org.apache.samza.config.Config
 import scala.collection.JavaConversions._
 import scala.collection.immutable.TreeMap
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
-class ApplicationMasterWebServlet(config: Config, state: SamzaAppState) 
extends ScalatraServlet with ScalateSupport {
+class ApplicationMasterWebServlet(config: Config, samzaAppState: 
SamzaApplicationState, state: YarnAppState) extends ScalatraServlet with 
ScalateSupport {
   val yarnConfig = new YarnConfiguration
 
   before() {
@@ -39,6 +40,7 @@ class ApplicationMasterWebServlet(config: Config, state: 
SamzaAppState) extends
     layoutTemplate("/WEB-INF/views/index.scaml",
       "config" -> TreeMap(config.sanitize.toMap.toArray: _*),
       "state" -> state,
+      "samzaAppState" -> samzaAppState,
       "rmHttpAddress" -> WebAppUtils.getRMWebAppURLWithScheme(yarnConfig))
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
deleted file mode 100644
index e21aded..0000000
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.job.yarn.util.MockContainerRequestState;
-import org.apache.samza.job.yarn.util.TestUtil;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class TestContainerAllocator extends TestContainerAllocatorCommon {
-
-  private final Config config = new MapConfig(new HashMap<String, String>() {
-    {
-      put("yarn.container.count", "1");
-      put("systems.test-system.samza.factory", 
"org.apache.samza.job.yarn.MockSystemFactory");
-      put("yarn.container.memory.mb", "512");
-      put("yarn.package.path", "/foo");
-      put("task.inputs", "test-system.test-stream");
-      put("systems.test-system.samza.key.serde", 
"org.apache.samza.serializers.JsonSerde");
-      put("systems.test-system.samza.msg.serde", 
"org.apache.samza.serializers.JsonSerde");
-      put("yarn.container.retry.count", "1");
-      put("yarn.container.retry.window.ms", "1999999999");
-      put("yarn.container.request.timeout.ms", "3");
-      put("yarn.allocator.sleep.ms", "1");
-    }
-  });
-
-  @Override
-  protected Config getConfig() {
-    return config;
-  }
-
-  @Override
-  protected MockContainerRequestState createContainerRequestState(
-      AMRMClientAsync<AMRMClient.ContainerRequest> amClient) {
-    return new MockContainerRequestState(amClient, false);
-  }
-
-  /**
-   * Test request containers with no containerToHostMapping makes the right 
number of requests
-   */
-  @Test
-  public void testRequestContainersWithNoMapping() throws Exception {
-    int containerCount = 4;
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, 
String>();
-    for (int i = 0; i < containerCount; i++) {
-      containersToHostMapping.put(i, null);
-    }
-    allocatorThread.start();
-
-    containerAllocator.requestContainers(containersToHostMapping);
-
-    assertNotNull(requestState);
-
-    assertNotNull(requestState.getRequestsQueue());
-    assertTrue(requestState.getRequestsQueue().size() == 4);
-
-    // If host-affinty is not enabled, it doesn't update the requestMap
-    assertNotNull(requestState.getRequestsToCountMap());
-    assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0);
-  }
-  /**
-   * Adds all containers returned to ANY_HOST only
-   */
-  @Test
-  public void testAddContainer() throws Exception {
-    assertNull(requestState.getContainersOnAHost("host1"));
-    assertNull(requestState.getContainersOnAHost(ANY_HOST));
-
-    
containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
 "host1", 123));
-    
containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"),
 "host3", 123));
-
-    assertNull(requestState.getContainersOnAHost("host1"));
-    assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
-    assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2);
-  }
-
-  /**
-   * Test requestContainers
-   */
-  @Test
-  public void testRequestContainers() throws Exception {
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, 
String>() {
-      {
-        put(0, "host1");
-        put(1, "host2");
-        put(2, null);
-        put(3, "host1");
-      }
-    };
-
-    allocatorThread.start();
-
-    containerAllocator.requestContainers(containersToHostMapping);
-
-    assertNotNull(testAMRMClient.requests);
-    assertEquals(4, testAMRMClient.requests.size());
-
-    assertNotNull(requestState);
-
-    assertNotNull(requestState.getRequestsQueue());
-    assertTrue(requestState.getRequestsQueue().size() == 4);
-
-    // If host-affinty is not enabled, it doesn't update the requestMap
-    assertNotNull(requestState.getRequestsToCountMap());
-    assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
deleted file mode 100644
index 0bbd48d..0000000
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.job.yarn.util.MockContainerListener;
-import org.apache.samza.job.yarn.util.MockContainerRequestState;
-import org.apache.samza.job.yarn.util.MockContainerUtil;
-import org.apache.samza.job.yarn.util.MockHttpServer;
-import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
-import org.apache.samza.job.yarn.util.TestUtil;
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * Handles all common fields/tests for ContainerAllocators.
- */
-public abstract class TestContainerAllocatorCommon {
-  protected static final String ANY_HOST = ContainerRequestState.ANY_HOST;
-
-  protected final HttpServer server = new MockHttpServer("/", 7777, null, new 
ServletHolder(DefaultServlet.class));
-
-  protected AMRMClientAsyncImpl amRmClientAsync;
-  protected TestAMRMClientImpl testAMRMClient;
-  protected MockContainerRequestState requestState;
-  protected AbstractContainerAllocator containerAllocator;
-  protected Thread allocatorThread;
-  protected ContainerUtil containerUtil;
-
-  protected SamzaAppState state = new SamzaAppState(getCoordinator(1), -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 
2);
-
-  protected abstract Config getConfig();
-  protected abstract MockContainerRequestState 
createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> 
amClient);
-
-  private JobModelManager getCoordinator(int containerCount) {
-    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
-    for (int i = 0; i < containerCount; i++) {
-      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, 
TaskModel>());
-      containers.put(i, container);
-    }
-    JobModel jobModel = new JobModel(getConfig(), containers);
-    return new JobModelManager(jobModel, server, null);
-  }
-
-
-  @Before
-  public void setup() throws Exception {
-    // Create AMRMClient
-    testAMRMClient = new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-            false,
-            new ArrayList<Container>(),
-            new ArrayList<ContainerStatus>()
-        ));
-    amRmClientAsync = TestUtil.getAMClient(testAMRMClient);
-
-    // Initialize certain state variables
-    state.coordinatorUrl = new URL("http://localhost:7778/";);
-
-    containerUtil = TestUtil.getContainerUtil(getConfig(), state);
-
-    requestState = createContainerRequestState(amRmClientAsync);
-    containerAllocator = new HostAwareContainerAllocator(
-        amRmClientAsync,
-        containerUtil,
-        new YarnConfig(getConfig())
-    );
-    Field requestStateField = 
containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-    requestStateField.set(containerAllocator, requestState);
-
-    allocatorThread = new Thread(containerAllocator);
-  }
-
-  @After
-  public void teardown() throws Exception {
-    containerAllocator.setIsRunning(false);
-    allocatorThread.join();
-  }
-
-  /**
-   * If the container fails to start e.g because it fails to connect to a NM 
on a host that
-   * is down, the allocator should request a new container on a different host.
-   */
-  @Test
-  public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
-    final Container container = TestUtil
-        
.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"),
 "host2", 123);
-    final Container container1 = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
 "host1", 123);
-
-    ((MockContainerUtil) containerUtil).containerStartException = new 
IOException("You shall not... connect to the NM!");
-
-    Runnable releasedContainerAssertions = new Runnable() {
-      @Override
-      public void run() {
-        // The failed container should be released. The successful one should 
not.
-        assertNotNull(testAMRMClient.getRelease());
-        assertEquals(1, testAMRMClient.getRelease().size());
-        assertTrue(testAMRMClient.getRelease().contains(container.getId()));
-      }
-    };
-
-    Runnable assignedContainerAssertions = new Runnable() {
-      @Override
-      public void run() {
-        // Test that the first request assignment had a preferred host and the 
retry didn't
-        assertEquals(2, requestState.assignedRequests.size());
-
-        SamzaContainerRequest request = requestState.assignedRequests.remove();
-        assertEquals(0, request.expectedContainerId);
-        assertEquals("host2", request.getPreferredHost());
-
-        request = requestState.assignedRequests.remove();
-        assertEquals(0, request.expectedContainerId);
-        assertEquals("ANY_HOST", request.getPreferredHost());
-
-        // This routine should be called after the retry is assigned, but 
before it's started.
-        // So there should still be 1 container needed because 
neededContainers should not be decremented for a failed start.
-        assertEquals(1, state.neededContainers.get());
-      }
-    };
-
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, 
null, releasedContainerAssertions, assignedContainerAssertions, null);
-    requestState.registerContainerListener(listener);
-    state.neededContainers.set(1); // Normally this would be done in the 
SamzaTaskManager
-
-    // Only request 1 container and we should see 2 assignments in the 
assertions above (because of the retry)
-    containerAllocator.requestContainer(0, "host2");
-    containerAllocator.addContainer(container);
-    containerAllocator.addContainer(container1);
-
-    allocatorThread.start();
-
-    listener.verify();
-  }
-
-
-  /**
-   * Extra allocated containers that are returned by the RM and unused by the 
AM should be released.
-   * Containers are considered "extra" only when there are no more pending 
requests to fulfill
-   * @throws Exception
-   */
-  @Test
-  public void testAllocatorReleasesExtraContainers() throws Exception {
-    final Container container = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"),
 "host1", 123);
-    final Container container1 = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
 "host1", 123);
-    final Container container2 = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"),
 "host2", 123);
-
-    Runnable releasedContainerAssertions = new Runnable() {
-      @Override
-      public void run() {
-        assertNotNull(testAMRMClient.getRelease());
-        assertEquals(2, testAMRMClient.getRelease().size());
-        assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
-        assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
-
-        // Test that state is cleaned up
-        assertEquals(0, requestState.getRequestsQueue().size());
-        assertEquals(0, requestState.getRequestsToCountMap().size());
-        assertNull(requestState.getContainersOnAHost("host1"));
-        assertNull(requestState.getContainersOnAHost("host2"));
-      }
-    };
-
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(3, 2, 0, 0, 
null, releasedContainerAssertions, null, null);
-    requestState.registerContainerListener(listener);
-
-    containerAllocator.requestContainer(0, "host1");
-
-    containerAllocator.addContainer(container);
-    containerAllocator.addContainer(container1);
-    containerAllocator.addContainer(container2);
-
-    allocatorThread.start();
-
-    listener.verify();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java
deleted file mode 100644
index 402fe78..0000000
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
-import org.apache.samza.job.yarn.util.TestUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-import static org.junit.Assert.*;
-
-public class TestContainerRequestState {
-  private AMRMClientAsyncImpl amRmClientAsync;
-  private TestAMRMClientImpl testAMRMClient;
-  private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
-
-  @Before
-  public void setup() {
-    // Create AMRMClient
-    testAMRMClient = new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-            false,
-            new ArrayList<Container>(),
-            new ArrayList<ContainerStatus>()
-        ));
-    amRmClientAsync = TestUtil.getAMClient(testAMRMClient);
-  }
-
-  /**
-   * Test state after a request is submitted
-   */
-  @Test
-  public void testUpdateRequestState() {
-    // Host-affinity is enabled
-    ContainerRequestState state = new ContainerRequestState(amRmClientAsync, 
true);
-    SamzaContainerRequest request = new SamzaContainerRequest(0, "abc");
-    state.updateRequestState(request);
-
-    assertNotNull(testAMRMClient.requests);
-    assertEquals(1, testAMRMClient.requests.size());
-    assertEquals(request.getIssuedRequest(), testAMRMClient.requests.get(0));
-
-    assertNotNull(state.getRequestsQueue());
-    assertTrue(state.getRequestsQueue().size() == 1);
-
-    assertNotNull(state.getRequestsToCountMap());
-    assertNotNull(state.getRequestsToCountMap().get("abc"));
-    assertEquals(1, state.getRequestsToCountMap().get("abc").get());
-
-    assertNotNull(state.getContainersOnAHost("abc"));
-    assertEquals(0, state.getContainersOnAHost("abc").size());
-
-    // Host-affinity is not enabled
-    ContainerRequestState state1 = new ContainerRequestState(amRmClientAsync, 
false);
-    SamzaContainerRequest request1 = new SamzaContainerRequest(1, null);
-    state1.updateRequestState(request1);
-
-    assertNotNull(testAMRMClient.requests);
-    assertEquals(2, testAMRMClient.requests.size());
-
-    AMRMClient.ContainerRequest expectedContainerRequest = 
request1.getIssuedRequest();
-    AMRMClient.ContainerRequest actualContainerRequest = 
testAMRMClient.requests.get(1);
-
-    assertEquals(expectedContainerRequest.getCapability(), 
actualContainerRequest.getCapability());
-    assertEquals(expectedContainerRequest.getPriority(), 
actualContainerRequest.getPriority());
-    assertNull(actualContainerRequest.getNodes());
-
-    assertNotNull(state1.getRequestsQueue());
-    assertTrue(state1.getRequestsQueue().size() == 1);
-
-    assertNotNull(state1.getRequestsToCountMap());
-    assertNull(state1.getRequestsToCountMap().get(ANY_HOST));
-
-  }
-
-  /**
-   * Test addContainer() updates the state correctly
-   */
-  @Test
-  public void testAddContainer() {
-    // Add container to ANY_LIST when host-affinity is not enabled
-    ContainerRequestState state = new ContainerRequestState(amRmClientAsync, 
false);
-    Container container = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
 "abc", 123);
-    state.addContainer(container);
-
-    assertNotNull(state.getRequestsQueue());
-    assertNotNull(state.getRequestsToCountMap());
-    assertNotNull(state.getContainersOnAHost(ANY_HOST));
-
-    assertEquals(1, state.getContainersOnAHost(ANY_HOST).size());
-    assertEquals(container, state.getContainersOnAHost(ANY_HOST).get(0));
-
-    // Container Allocated when there is no request in queue
-    ContainerRequestState state1 = new ContainerRequestState(amRmClientAsync, 
true);
-    Container container1 = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"),
 "zzz", 123);
-    state1.addContainer(container1);
-
-    assertNotNull(state1.getRequestsQueue());
-    assertEquals(0, state1.getRequestsQueue().size());
-
-    assertNull(state1.getContainersOnAHost("zzz"));
-    assertNotNull(state1.getContainersOnAHost(ANY_HOST));
-    assertEquals(1, state1.getContainersOnAHost(ANY_HOST).size());
-    assertEquals(container1, state1.getContainersOnAHost(ANY_HOST).get(0));
-
-    // Container Allocated on a Requested Host
-    state1.updateRequestState(new SamzaContainerRequest(0, "abc"));
-
-    assertNotNull(state1.getRequestsQueue());
-    assertEquals(1, state1.getRequestsQueue().size());
-
-    assertNotNull(state1.getRequestsToCountMap());
-    assertNotNull(state1.getRequestsToCountMap().get("abc"));
-    assertEquals(1, state1.getRequestsToCountMap().get("abc").get());
-
-    state1.addContainer(container);
-
-    assertNotNull(state1.getContainersOnAHost("abc"));
-    assertEquals(1, state1.getContainersOnAHost("abc").size());
-    assertEquals(container, state1.getContainersOnAHost("abc").get(0));
-
-    // Container Allocated on host that was not requested
-    Container container2 = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"),
 "xyz", 123);
-    state1.addContainer(container2);
-
-    assertNull(state1.getContainersOnAHost("xyz"));
-    assertNotNull(state1.getContainersOnAHost(ANY_HOST));
-    assertEquals(2, state1.getContainersOnAHost(ANY_HOST).size());
-    assertEquals(container2, state1.getContainersOnAHost(ANY_HOST).get(1));
-
-    // Extra containers were allocated on a host that was requested
-    Container container3 = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000005"),
 "abc", 123);
-    state1.addContainer(container3);
-
-    assertEquals(3, state1.getContainersOnAHost(ANY_HOST).size());
-    assertEquals(container3, state1.getContainersOnAHost(ANY_HOST).get(2));
-  }
-
-  /**
-   * Test request state after container is assigned to a host
-   * * Assigned on requested host
-   * * Assigned on any host
-   */
-  @Test
-  public void testContainerAssignment() throws Exception {
-    // Host-affinity enabled
-    ContainerRequestState state = new ContainerRequestState(amRmClientAsync, 
true);
-    SamzaContainerRequest request = new SamzaContainerRequest(0, "abc");
-    SamzaContainerRequest request1 = new SamzaContainerRequest(0, "def");
-
-    state.updateRequestState(request);
-    state.updateRequestState(request1);
-
-    Container container = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"),
 "abc", 123);
-    Container container1 = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"),
 "zzz", 123);
-    state.addContainer(container);
-    state.addContainer(container1);
-
-    assertEquals(2, state.getRequestsQueue().size());
-    assertEquals(2, state.getRequestsToCountMap().size());
-
-    assertNotNull(state.getContainersOnAHost("abc"));
-    assertEquals(1, state.getContainersOnAHost("abc").size());
-    assertEquals(container, state.getContainersOnAHost("abc").get(0));
-
-    assertNotNull(state.getContainersOnAHost("def"));
-    assertEquals(0, state.getContainersOnAHost("def").size());
-
-    assertNotNull(state.getContainersOnAHost(ANY_HOST));
-    assertEquals(1, state.getContainersOnAHost(ANY_HOST).size());
-    assertEquals(container1, state.getContainersOnAHost(ANY_HOST).get(0));
-
-    // Container assigned on the requested host
-    state.updateStateAfterAssignment(request, "abc", container);
-
-    assertEquals(1, state.getRequestsQueue().size());
-    assertEquals(request1, state.getRequestsQueue().peek());
-
-    assertNotNull(state.getRequestsToCountMap().get("abc"));
-    assertEquals(0, state.getRequestsToCountMap().get("abc").get());
-
-    assertNotNull(state.getContainersOnAHost("abc"));
-    assertEquals(0, state.getContainersOnAHost("abc").size());
-
-    // Container assigned on any host
-    state.updateStateAfterAssignment(request1, ANY_HOST, container1);
-
-    assertEquals(0, state.getRequestsQueue().size());
-
-    assertNotNull(state.getRequestsToCountMap().get("def"));
-    assertEquals(0, state.getRequestsToCountMap().get("def").get());
-
-    assertNotNull(state.getContainersOnAHost(ANY_HOST));
-    assertEquals(0, state.getContainersOnAHost(ANY_HOST).size());
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
deleted file mode 100644
index ead7200..0000000
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.job.yarn.util.MockContainerListener;
-import org.apache.samza.job.yarn.util.MockContainerRequestState;
-import org.apache.samza.job.yarn.util.MockContainerUtil;
-import org.apache.samza.job.yarn.util.TestUtil;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class TestHostAwareContainerAllocator extends 
TestContainerAllocatorCommon {
-
-  private final Config config = new MapConfig(new HashMap<String, String>() {
-    {
-      put("yarn.container.count", "1");
-      put("systems.test-system.samza.factory", 
"org.apache.samza.job.yarn.MockSystemFactory");
-      put("yarn.container.memory.mb", "512");
-      put("yarn.package.path", "/foo");
-      put("task.inputs", "test-system.test-stream");
-      put("systems.test-system.samza.key.serde", 
"org.apache.samza.serializers.JsonSerde");
-      put("systems.test-system.samza.msg.serde", 
"org.apache.samza.serializers.JsonSerde");
-      put("yarn.container.retry.count", "1");
-      put("yarn.container.retry.window.ms", "1999999999");
-      put("yarn.samza.host-affinity.enabled", "true");
-      put("yarn.container.request.timeout.ms", "3");
-      put("yarn.allocator.sleep.ms", "1");
-    }
-  });
-
-  @Override
-  protected Config getConfig() {
-    return config;
-  }
-
-  @Override
-  protected MockContainerRequestState 
createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> 
amClient) {
-    return new MockContainerRequestState(amClient, true);
-  }
-
-  /**
-   * Test request containers with no containerToHostMapping makes the right 
number of requests
-   */
-  @Test
-  public void testRequestContainersWithNoMapping() throws Exception {
-    int containerCount = 4;
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, 
String>();
-    for (int i = 0; i < containerCount; i++) {
-      containersToHostMapping.put(i, null);
-    }
-
-    allocatorThread.start();
-
-    containerAllocator.requestContainers(containersToHostMapping);
-
-    assertNotNull(requestState);
-
-    assertNotNull(requestState.getRequestsQueue());
-    assertEquals(4, requestState.getRequestsQueue().size());
-
-    assertNotNull(requestState.getRequestsToCountMap());
-    assertEquals(1, requestState.getRequestsToCountMap().keySet().size());
-    
assertTrue(requestState.getRequestsToCountMap().keySet().contains(ANY_HOST));
-  }
-
-  /**
-   * Add containers to the correct host in the request state
-   */
-  @Test
-  public void testAddContainerWithHostAffinity() throws Exception {
-    containerAllocator.requestContainers(new HashMap<Integer, String>() {
-      {
-        put(0, "host1");
-        put(1, "host3");
-      }
-    });
-
-    assertNotNull(requestState.getContainersOnAHost("host1"));
-    assertEquals(0, requestState.getContainersOnAHost("host1").size());
-
-    assertNotNull(requestState.getContainersOnAHost("host3"));
-    assertEquals(0, requestState.getContainersOnAHost("host3").size());
-
-    assertNull(requestState.getContainersOnAHost(ANY_HOST));
-
-    
containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
 "host1", 123));
-    
containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"),
 "host2", 123));
-    
containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"),
 "host3", 123));
-
-    assertNotNull(requestState.getContainersOnAHost("host1"));
-    assertEquals(1, requestState.getContainersOnAHost("host1").size());
-
-    assertNotNull(requestState.getContainersOnAHost("host3"));
-    assertEquals(1, requestState.getContainersOnAHost("host3").size());
-
-    assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
-    assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 1);
-    
assertEquals(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"),
-        requestState.getContainersOnAHost(ANY_HOST).get(0).getId());
-  }
-
-
-  @Test
-  public void testRequestContainers() throws Exception {
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, 
String>() {
-      {
-        put(0, "host1");
-        put(1, "host2");
-        put(2, null);
-        put(3, "host1");
-      }
-    };
-    allocatorThread.start();
-
-    containerAllocator.requestContainers(containersToHostMapping);
-
-    assertNotNull(testAMRMClient.requests);
-    assertEquals(4, testAMRMClient.requests.size());
-
-    assertNotNull(requestState);
-
-    assertNotNull(requestState.getRequestsQueue());
-    assertTrue(requestState.getRequestsQueue().size() == 4);
-
-    assertNotNull(requestState.getRequestsToCountMap());
-    Map<String, AtomicInteger> requestsMap = 
requestState.getRequestsToCountMap();
-
-    assertNotNull(requestsMap.get("host1"));
-    assertEquals(2, requestsMap.get("host1").get());
-
-    assertNotNull(requestsMap.get("host2"));
-    assertEquals(1, requestsMap.get("host2").get());
-
-    assertNotNull(requestsMap.get(ANY_HOST));
-    assertEquals(1, requestsMap.get(ANY_HOST).get());
-  }
-
-  /**
-   * Handles expired requests correctly and assigns ANY_HOST
-   */
-  @Test
-  public void testExpiredRequestHandling() throws Exception {
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, 
String>() {
-      {
-        put(0, "requestedHost1");
-        put(1, "requestedHost2");
-      }
-    };
-    containerAllocator.requestContainers(containersToHostMapping);
-
-    assertNotNull(requestState.getRequestsQueue());
-    assertTrue(requestState.getRequestsQueue().size() == 2);
-
-    assertNotNull(requestState.getRequestsToCountMap());
-    assertNotNull(requestState.getRequestsToCountMap().get("requestedHost1"));
-    
assertTrue(requestState.getRequestsToCountMap().get("requestedHost1").get() == 
1);
-
-    assertNotNull(requestState.getRequestsToCountMap().get("requestedHost2"));
-    
assertTrue(requestState.getRequestsToCountMap().get("requestedHost2").get() == 
1);
-
-    final Container container0 = TestUtil
-        
.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
 "availableHost1", 123);
-    final Container container1 = TestUtil
-        
.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"),
 "availableHost2", 123);
-
-    Runnable addedContainerAssertions = new Runnable() {
-      @Override
-      public void run() {
-        assertNotNull(requestState.getRequestsToCountMap());
-        assertNull(requestState.getContainersOnAHost("availableHost1"));
-        assertNull(requestState.getContainersOnAHost("availableHost2"));
-        assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
-        assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2);
-      }
-    };
-
-    Runnable assignedContainerAssertions = new Runnable() {
-      @Override
-      public void run() {
-        List<Container> anyHostContainers = 
requestState.getContainersOnAHost(ANY_HOST);
-        assertTrue(anyHostContainers == null || anyHostContainers.isEmpty());
-
-        assertNotNull(requestState.getRequestsQueue());
-        assertTrue(requestState.getRequestsQueue().size() == 0);
-
-        assertNotNull(requestState.getRequestsToCountMap());
-        
assertNotNull(requestState.getRequestsToCountMap().get("requestedHost1"));
-        
assertNotNull(requestState.getRequestsToCountMap().get("requestedHost2"));
-      }
-    };
-
-    Runnable runningContainerAssertions = new Runnable() {
-      @Override
-      public void run() {
-        MockContainerUtil mockContainerUtil = (MockContainerUtil) 
containerUtil;
-
-        
assertNotNull(mockContainerUtil.runningContainerList.get("availableHost1"));
-        
assertTrue(mockContainerUtil.runningContainerList.get("availableHost1").contains(container0));
-
-        
assertNotNull(mockContainerUtil.runningContainerList.get("availableHost2"));
-        
assertTrue(mockContainerUtil.runningContainerList.get("availableHost2").contains(container1));
-      }
-    };
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(
-        2, 0, 2, 2,
-        addedContainerAssertions,
-        null,
-        assignedContainerAssertions,
-        runningContainerAssertions);
-    requestState.registerContainerListener(listener);
-    ((MockContainerUtil) containerUtil).registerContainerListener(listener);
-
-    containerAllocator.addContainer(container0);
-    containerAllocator.addContainer(container1);
-
-    // Start after adding containers to avoid a race condition between the 
allocator thread
-    // using the containers and the assertions after the containers are added.
-    allocatorThread.start();
-
-    listener.verify();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java
deleted file mode 100644
index ad0f4d3..0000000
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class TestSamzaContainerRequest {
-  private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
-
-  @Test
-  public void testPreferredHostIsNeverNull() {
-    SamzaContainerRequest request = new SamzaContainerRequest(0, null);
-
-    assertNotNull(request.getPreferredHost());
-
-    // preferredHost is null, it should automatically default to ANY_HOST
-    assertTrue(request.getPreferredHost().equals(ANY_HOST));
-
-    SamzaContainerRequest request1 = new SamzaContainerRequest(1, "abc");
-    assertNotNull(request1.getPreferredHost());
-    assertTrue(request1.getPreferredHost().equals("abc"));
-  }
-
-  @Test
-  public void testAnyHostIsNotPassedToYarnRequest() {
-    SamzaContainerRequest request = new SamzaContainerRequest(0, null);
-    assertNull(request.getIssuedRequest().getNodes());
-
-    SamzaContainerRequest request1 = new SamzaContainerRequest(1, ANY_HOST);
-    assertNull(request1.getIssuedRequest().getNodes());
-  }
-}

Reply via email to