Repository: samza
Updated Branches:
  refs/heads/master 9f7abf535 -> 947472a0b


http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
new file mode 100644
index 0000000..57fef12
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestHostAwareContainerAllocator {
+
+  private final MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
+  private final MockClusterResourceManager manager = new 
MockClusterResourceManager(callback);
+  private final Config config = getConfig();
+  private final JobModelManager reader = getJobModelManager(1);
+  private final SamzaApplicationState state = new 
SamzaApplicationState(reader);
+  private HostAwareContainerAllocator containerAllocator;
+  private final int timeoutMillis = 1000;
+  private MockContainerRequestState requestState;
+  private Thread allocatorThread;
+
+  @Before
+  public void setup() throws Exception {
+    containerAllocator = new HostAwareContainerAllocator(manager, 
timeoutMillis, config, state);
+    requestState = new MockContainerRequestState(manager, true);
+    Field requestStateField = 
containerAllocator.getClass().getSuperclass().getDeclaredField("resourceRequestState");
+    requestStateField.setAccessible(true);
+    requestStateField.set(containerAllocator, requestState);
+    allocatorThread = new Thread(containerAllocator);
+  }
+
+
+  /**
+   * Test request containers with no containerToHostMapping makes the right 
number of requests
+   */
+  @Test
+  public void testRequestContainersWithNoMapping() throws Exception {
+    int containerCount = 4;
+    Map<Integer, String> containersToHostMapping = new HashMap<Integer, 
String>();
+    for (int i = 0; i < containerCount; i++) {
+      containersToHostMapping.put(i, null);
+    }
+
+    allocatorThread.start();
+
+    containerAllocator.requestResources(containersToHostMapping);
+
+    assertNotNull(requestState);
+
+    assertEquals(4, requestState.numPendingRequests());
+
+    assertNotNull(requestState.getRequestsToCountMap());
+    assertEquals(1, requestState.getRequestsToCountMap().keySet().size());
+    
assertTrue(requestState.getRequestsToCountMap().keySet().contains(ResourceRequestState.ANY_HOST));
+  }
+
+  /**
+   * Add containers to the correct host in the request state
+   */
+  @Test
+  public void testAddContainerWithHostAffinity() throws Exception {
+    containerAllocator.requestResources(new HashMap<Integer, String>() {
+      {
+        put(0, "abc");
+        put(1, "xyz");
+      }
+    });
+
+    assertNotNull(requestState.getResourcesOnAHost("abc"));
+    assertEquals(0, requestState.getResourcesOnAHost("abc").size());
+
+    assertNotNull(requestState.getResourcesOnAHost("xyz"));
+    assertEquals(0, requestState.getResourcesOnAHost("xyz").size());
+
+    
assertNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
+
+    containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID1"));
+    containerAllocator.addResource(new SamzaResource(1, 10, "def", "ID2"));
+    containerAllocator.addResource(new SamzaResource(1, 10, "xyz", "ID3"));
+
+
+    assertNotNull(requestState.getResourcesOnAHost("abc"));
+    assertEquals(1, requestState.getResourcesOnAHost("abc").size());
+
+    assertNotNull(requestState.getResourcesOnAHost("xyz"));
+    assertEquals(1, requestState.getResourcesOnAHost("xyz").size());
+
+    
assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
+    
assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size()
 == 1);
+    assertEquals("ID2", 
requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).get(0).getResourceID());
+  }
+
+  @Test
+  public void testAllocatorReleasesExtraContainers() throws Exception {
+    final SamzaResource resource0 = new SamzaResource(1, 1024, "abc", "id1");
+    final SamzaResource resource1 = new SamzaResource(1, 1024, "abc", "id2");
+    final SamzaResource resource2 = new SamzaResource(1, 1024, "def", "id3");
+
+    Runnable releasedAssertions = new Runnable() {
+      @Override
+      public void run() {
+        assertEquals(2, manager.releasedResources.size());
+        assertTrue(manager.releasedResources.contains(resource1));
+        assertTrue(manager.releasedResources.contains(resource2));
+
+        // Test that state is cleaned up
+        assertEquals(0, requestState.numPendingRequests());
+        assertEquals(0, requestState.getRequestsToCountMap().size());
+        assertNull(requestState.getResourcesOnAHost("abc"));
+        assertNull(requestState.getResourcesOnAHost("def"));
+      }
+    };
+    // Set up our final asserts before starting the allocator thread
+    MockContainerListener listener = new MockContainerListener(3, 2, 0, 0, 
null, releasedAssertions,
+        null, null);
+    requestState.registerContainerListener(listener);
+
+    allocatorThread.start();
+
+    containerAllocator.requestResource(0, "abc");
+
+    containerAllocator.addResource(resource0);
+    containerAllocator.addResource(resource1);
+    containerAllocator.addResource(resource2);
+
+    listener.verify();
+  }
+
+
+
+
+  @Test
+  public void testRequestContainers() throws Exception {
+    Map<Integer, String> containersToHostMapping = new HashMap<Integer, 
String>() {
+      {
+        put(0, "abc");
+        put(1, "def");
+        put(2, null);
+        put(3, "abc");
+      }
+    };
+
+    containerAllocator.requestResources(containersToHostMapping);
+
+    assertNotNull(manager.resourceRequests);
+    assertEquals(manager.resourceRequests.size(), 4);
+    assertEquals(requestState.numPendingRequests(), 4);
+
+    Map<String, AtomicInteger> requestsMap = 
requestState.getRequestsToCountMap();
+    assertNotNull(requestsMap.get("abc"));
+    assertEquals(2, requestsMap.get("abc").get());
+
+    assertNotNull(requestsMap.get("def"));
+    assertEquals(1, requestsMap.get("def").get());
+
+    assertNotNull(requestsMap.get(ResourceRequestState.ANY_HOST));
+    assertEquals(1, requestsMap.get(ResourceRequestState.ANY_HOST).get());
+  }
+
+  /**
+   * If the container fails to start e.g because it fails to connect to a NM 
on a host that
+   * is down, the allocator should request a new container on a different host.
+   */
+  @Test
+  public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
+
+    final SamzaResource container = new SamzaResource(1, 1024, "2", "id0");
+    final SamzaResource container1 = new SamzaResource(1, 1024, "1", "id1");
+    manager.nextException = new IOException("Cant connect to RM");
+
+    // Set up our final asserts before starting the allocator thread
+    MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, 
null, new Runnable() {
+      @Override
+      public void run() {
+        // The failed container should be released. The successful one should 
not.
+        assertNotNull(manager.releasedResources);
+        assertEquals(1, manager.releasedResources.size());
+        assertTrue(manager.releasedResources.contains(container));
+      }
+    },
+        new Runnable() {
+          @Override
+          public void run() {
+            // Test that the first request assignment had a preferred host and 
the retry didn't
+            assertEquals(2, requestState.assignedRequests.size());
+
+            SamzaResourceRequest request = 
requestState.assignedRequests.remove();
+            assertEquals(0, request.getContainerID());
+            assertEquals("2", request.getPreferredHost());
+
+            request = requestState.assignedRequests.remove();
+            assertEquals(0, request.getContainerID());
+            assertEquals("ANY_HOST", request.getPreferredHost());
+
+            // This routine should be called after the retry is assigned, but 
before it's started.
+            // So there should still be 1 container needed.
+            assertEquals(1, state.neededResources.get());
+          }
+        }, null
+    );
+    state.neededResources.set(1);
+    requestState.registerContainerListener(listener);
+
+    // Only request 1 container and we should see 2 assignments in the 
assertions above (because of the retry)
+    containerAllocator.requestResource(0, "2");
+    containerAllocator.addResource(container1);
+    containerAllocator.addResource(container);
+
+    allocatorThread.start();
+
+    listener.verify();
+  }
+
+
+  /**
+   * Handles expired requests correctly and assigns ANY_HOST
+   */
+
+  @Test
+  public void testExpiredRequestHandling() throws Exception {
+    final SamzaResource resource0 = new SamzaResource(1, 1000, "xyz", "id1");
+    final SamzaResource resource1 = new SamzaResource(1, 1000, "zzz", "id2");
+
+    Map<Integer, String> containersToHostMapping = new HashMap<Integer, 
String>() {
+      {
+        put(0, "abc");
+        put(1, "def");
+      }
+    };
+    containerAllocator.requestResources(containersToHostMapping);
+    assertEquals(requestState.numPendingRequests(), 2);
+    assertNotNull(requestState.getRequestsToCountMap());
+    assertNotNull(requestState.getRequestsToCountMap().get("abc"));
+    assertTrue(requestState.getRequestsToCountMap().get("abc").get() == 1);
+
+    assertNotNull(requestState.getRequestsToCountMap().get("def"));
+    assertTrue(requestState.getRequestsToCountMap().get("def").get() == 1);
+
+    Runnable addContainerAssertions = new Runnable() {
+      @Override
+      public void run() {
+        assertNull(requestState.getResourcesOnAHost("xyz"));
+        assertNull(requestState.getResourcesOnAHost("zzz"));
+        
assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
+        
assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size()
 == 2);
+      }
+    };
+
+    Runnable assignContainerAssertions = new Runnable() {
+      @Override
+      public void run() {
+        assertEquals(requestState.numPendingRequests(), 0);
+        assertNotNull(requestState.getRequestsToCountMap());
+        assertNotNull(requestState.getRequestsToCountMap().get("abc"));
+        assertNotNull(requestState.getRequestsToCountMap().get("def"));
+      }
+    };
+
+    Runnable runningContainerAssertions = new Runnable() {
+      @Override
+      public void run() {
+        assertTrue(manager.launchedResources.contains(resource0));
+        assertTrue(manager.launchedResources.contains(resource1));
+      }
+    };
+    MockContainerListener listener = new MockContainerListener(2, 0, 2, 2, 
addContainerAssertions, null, assignContainerAssertions, 
runningContainerAssertions);
+    requestState.registerContainerListener(listener);
+    ((MockClusterResourceManager) manager).registerContainerListener(listener);
+    containerAllocator.addResource(resource0);
+    containerAllocator.addResource(resource1);
+    allocatorThread.start();
+
+    listener.verify();
+  }
+
+  @After
+  public void teardown() throws Exception {
+    reader.stop();
+    containerAllocator.stop();
+  }
+
+
+  private static Config getConfig() {
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        put("yarn.container.count", "1");
+        put("systems.test-system.samza.factory", 
"org.apache.samza.job.yarn.MockSystemFactory");
+        put("yarn.container.memory.mb", "512");
+        put("yarn.package.path", "/foo");
+        put("task.inputs", "test-system.test-stream");
+        put("systems.test-system.samza.key.serde", 
"org.apache.samza.serializers.JsonSerde");
+        put("systems.test-system.samza.msg.serde", 
"org.apache.samza.serializers.JsonSerde");
+        put("yarn.container.retry.count", "1");
+        put("yarn.container.retry.window.ms", "1999999999");
+        put("yarn.samza.host-affinity.enabled", "true");
+        put("yarn.container.request.timeout.ms", "3");
+        put("yarn.allocator.sleep.ms", "1");
+      }
+    });
+
+    Map<String, String> map = new HashMap<>();
+    map.putAll(config);
+    return new MapConfig(map);
+  }
+
+  private static JobModelManager getJobModelManager(int containerCount) {
+    //Ideally, the JobModelReader should be constructed independent of 
HttpServer.
+    //That way it becomes easier to mock objects. Save it for later.
+
+    HttpServer server = new MockHttpServer("/", 7777, null, new 
ServletHolder(DefaultServlet.class));
+    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
+    for (int i = 0; i < containerCount; i++) {
+      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, 
TaskModel>());
+      containers.put(i, container);
+    }
+    JobModel jobModel = new JobModel(getConfig(), containers);
+    return new JobModelManager(jobModel, server, null);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 9df12d2..1358fdd 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -31,7 +31,7 @@ import scala.collection.JavaConversions._
 import org.apache.samza.Partition
 import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
-import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.server.{ServletBase, HttpServer, 
JobServlet}
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
@@ -76,9 +76,9 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     val jobModel = new JobModel(config, containers)
     def jobModelGenerator(): JobModel = jobModel
     val server = new HttpServer
-    val coordinator = new JobCoordinator(jobModel, server)
-    JobCoordinator.jobModelRef.set(jobModelGenerator())
-    coordinator.server.addServlet("/*", new 
JobServlet(JobCoordinator.jobModelRef))
+    val coordinator = new JobModelManager(jobModel, server)
+    JobModelManager.jobModelRef.set(jobModelGenerator())
+    coordinator.server.addServlet("/*", new 
JobServlet(JobModelManager.jobModelRef))
     try {
       coordinator.start
       assertEquals(jobModel, 
SamzaContainer.readJobModel(server.getUrl.toString))
@@ -101,9 +101,9 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     val jobModel = new JobModel(config, containers)
     def jobModelGenerator(): JobModel = jobModel
     val server = new HttpServer
-    val coordinator = new JobCoordinator(jobModel, server)
-    JobCoordinator.jobModelRef.set(jobModelGenerator())
-    val mockJobServlet = new MockJobServlet(2, JobCoordinator.jobModelRef)
+    val coordinator = new JobModelManager(jobModel, server)
+    JobModelManager.jobModelRef.set(jobModelGenerator())
+    val mockJobServlet = new MockJobServlet(2, JobModelManager.jobModelRef)
     coordinator.server.addServlet("/*", mockJobServlet)
     try {
       coordinator.start

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 110c3a9..ffdb006 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -98,12 +98,12 @@ class TestJobCoordinator {
     // We want the mocksystemconsumer to use the same instance across runs
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
 
-    val coordinator = JobCoordinator(new MapConfig(config ++ otherConfigs))
+    val coordinator = JobModelManager(new MapConfig(config ++ otherConfigs))
     val expectedJobModel = new JobModel(new MapConfig(config), containers)
 
     // Verify that the atomicReference is initialized
-    assertNotNull(JobCoordinator.jobModelRef.get())
-    assertEquals(expectedJobModel, JobCoordinator.jobModelRef.get())
+    assertNotNull(JobModelManager.jobModelRef.get())
+    assertEquals(expectedJobModel, JobModelManager.jobModelRef.get())
 
     coordinator.start
     assertEquals(new MapConfig(config), coordinator.jobModel.getConfig)
@@ -123,7 +123,6 @@ class TestJobCoordinator {
 
   @Test
   def testJobCoordinatorChangelogPartitionMapping = {
-    System.out.println("test  ")
     val task0Name = new TaskName("Partition 0")
     val ssp0 = Set(new SystemStreamPartition("test", "stream1", new 
Partition(0)))
     val task1Name = new TaskName("Partition 1")
@@ -165,7 +164,7 @@ class TestJobCoordinator {
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
 
     // start the job coordinator and verify if it has all the checkpoints 
through http port
-    val coordinator = JobCoordinator(new MapConfig(config ++ otherConfigs))
+    val coordinator = JobModelManager(new MapConfig(config ++ otherConfigs))
     coordinator.start
     val url = coordinator.server.getUrl.toString
 

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala 
b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
index 3a710a8..a55af50 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -19,7 +19,7 @@
 
 package org.apache.samza.job.local;
 
-import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.JobModelManager
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.job.ApplicationStatus
@@ -33,7 +33,7 @@ class TestProcessJob {
       override def buildCommand = "sleep 1"
       override def buildEnvironment = Map[String, String]()
     }
-    val coordinator = new MockJobCoordinator()
+    val coordinator = new MockJobModelManager()
     val job = new ProcessJob(commandBuilder, coordinator)
     job.submit
     job.waitForFinish(999999)
@@ -45,7 +45,7 @@ class TestProcessJob {
       override def buildCommand = "sleep 999999999"
       override def buildEnvironment = Map[String, String]()
     }
-    val coordinator = new MockJobCoordinator()
+    val coordinator = new MockJobModelManager()
     val job = new ProcessJob(commandBuilder, coordinator)
     job.submit
     job.waitForFinish(500)
@@ -56,7 +56,7 @@ class TestProcessJob {
   }
 }
 
-class MockJobCoordinator extends JobCoordinator(null, null) {
+class MockJobModelManager extends JobModelManager(null, null) {
   var stopped: Boolean = false
 
   override def start: Unit = { }

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 0c6329e..ca4eb7f 100644
--- 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -32,7 +32,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.Log4jSystemConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -103,7 +103,7 @@ public class StreamAppender extends AppenderSkeleton {
       try {
         recursiveCall.set(true);
         if (!systemInitialized) {
-          if (JobCoordinator.currentJobCoordinator() != null) {
+          if (JobModelManager.currentJobModelManager() != null) {
             // JobCoordinator has been instantiated
             setupSystem();
             systemInitialized = true;
@@ -173,7 +173,7 @@ public class StreamAppender extends AppenderSkeleton {
 
     try {
       if (isApplicationMaster) {
-        config = JobCoordinator.currentJobCoordinator().jobModel().getConfig();
+        config = 
JobModelManager.currentJobModelManager().jobModel().getConfig();
       } else {
         String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
         config = SamzaObjectMapper.getObjectMapper()

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-shell/src/main/bash/run-am.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-am.sh 
b/samza-shell/src/main/bash/run-am.sh
index 9545a96..ca938cc 100755
--- a/samza-shell/src/main/bash/run-am.sh
+++ b/samza-shell/src/main/bash/run-am.sh
@@ -22,4 +22,4 @@
 # Set container name system properties for use in Log4J
 [[ $JAVA_OPTS != *-Dsamza.container.name* ]] && export JAVA_OPTS="$JAVA_OPTS 
-Dsamza.container.name=samza-application-master"
 
-exec $(dirname $0)/run-class.sh org.apache.samza.job.yarn.SamzaAppMaster "$@"
+exec $(dirname $0)/run-class.sh org.apache.samza.job.yarn.SamzaAppMaster "$@"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-shell/src/main/bash/run-jc.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-jc.sh 
b/samza-shell/src/main/bash/run-jc.sh
new file mode 100644
index 0000000..cf3e734
--- /dev/null
+++ b/samza-shell/src/main/bash/run-jc.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Check if server is set. If not - set server optimization
+[[ $JAVA_OPTS != *-server* ]] && export JAVA_OPTS="$JAVA_OPTS -server"
+
+# Set container name system properties for use in Log4J
+[[ $JAVA_OPTS != *-Dsamza.container.name* ]] && export JAVA_OPTS="$JAVA_OPTS 
-Dsamza.container.name=samza-job-coordinator"
+
+exec $(dirname $0)/run-class.sh 
org.apache.samza.clustermanager.ClusterBasedJobCoordinator "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
index 77280ba..c116ed8 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 
 import java.net.URL;
@@ -45,7 +45,7 @@ public class SamzaAppState {
    * lifecycle. It helps querying JobModel related info in {@link 
org.apache.samza.webapp.ApplicationMasterRestServlet}
    * and locality information when host-affinity is enabled in {@link 
org.apache.samza.job.yarn.SamzaTaskManager}
    */
-  public final JobCoordinator jobCoordinator;
+  public final JobModelManager jobCoordinator;
 
   /*  The following state variables are primarily used for reference in the AM 
web services   */
   /**
@@ -168,13 +168,13 @@ public class SamzaAppState {
 
   public AtomicInteger matchedContainerRequests = new AtomicInteger(0);
 
-  public SamzaAppState(JobCoordinator jobCoordinator,
+  public SamzaAppState(JobModelManager jobModelManager,
                        int taskId,
                        ContainerId amContainerId,
                        String nodeHost,
                        int nodePort,
                        int nodeHttpPort) {
-    this.jobCoordinator = jobCoordinator;
+    this.jobCoordinator = jobModelManager;
     this.taskId = taskId;
     this.amContainerId = amContainerId;
     this.nodeHost = nodeHost;

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
index caee6e6..bc95f31 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
@@ -139,7 +139,7 @@ class SamzaTaskManager implements YarnAppMasterListener {
   }
 
   /**
-   * This methods handles the onContainerCompleted callback from the RM. Based 
on the ContainerExitStatus, it decides
+   * This methods handles the onResourceCompleted callback from the RM. Based 
on the ContainerExitStatus, it decides
    * whether a container that exited is marked as complete or failure.
    */
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
new file mode 100644
index 0000000..57092e1
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.samza.clustermanager.SamzaApplicationState;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.job.yarn.YarnContainer;
+
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * YarnAppState encapsulates Yarn specific state variables that are Yarn 
specific. This class
+ * is useful for information to display in the UI.
+ *
+ * TODO: make these variables private, provide thread-safe accessors.
+ * Saving making changes to variables in YarnAppState because it is used by 
the UI, and changes to
+ * variable names, data structure etc. will require changes to the UI scaml 
templates too. This is tracked
+ * as a part of SAMZA-902
+ */
+
+public class YarnAppState {
+
+  /**
+   /**
+  * State indicating whether the job is healthy or not
+  * Modified by both the AMRMCallbackThread and the ContainerAllocator thread
+  */
+
+  public Map<Integer, YarnContainer> runningYarnContainers = new 
ConcurrentHashMap<Integer, YarnContainer>()  ;
+
+  public ConcurrentMap<String, ContainerStatus> failedContainersStatus = new 
ConcurrentHashMap<String, ContainerStatus>();
+
+  public YarnAppState(JobModelManager jobModelManager,
+                      int taskId,
+                      ContainerId amContainerId,
+                      String nodeHost,
+                      int nodePort,
+                      int nodeHttpPort,
+                      SamzaApplicationState state) {
+    this.jobModelManager = jobModelManager;
+    this.taskId = taskId;
+    this.amContainerId = amContainerId;
+    this.nodeHost = nodeHost;
+    this.nodePort = nodePort;
+    this.nodeHttpPort = nodeHttpPort;
+    this.appAttemptId = amContainerId.getApplicationAttemptId();
+    this.samzaAppState = state;
+  }
+
+
+  @Override
+  public String toString() {
+    return "YarnAppState{" +
+        "samzaAppState=" + samzaAppState +
+        ", jobModelReader=" + jobModelManager +
+        ", taskId=" + taskId +
+        ", amContainerId=" + amContainerId +
+        ", nodeHost='" + nodeHost + '\'' +
+        ", nodePort=" + nodePort +
+        ", nodeHttpPort=" + nodeHttpPort +
+        ", appAttemptId=" + appAttemptId +
+        ", coordinatorUrl=" + coordinatorUrl +
+        ", rpcUrl=" + rpcUrl +
+        ", trackingUrl=" + trackingUrl +
+        ", runningYarnContainers=" + runningYarnContainers +
+        ", failedContainersStatus=" + failedContainersStatus +
+        '}';
+  }
+
+  public final SamzaApplicationState samzaAppState;
+   /* The following state variables are primarily used for reference in the AM 
web services   */
+
+  /**
+   * Task Id of the AM
+   * Used for displaying in the AM UI. Usage found in {@link 
org.apache.samza.webapp.ApplicationMasterRestServlet}
+   * and scalate/WEB-INF/views/index.scaml
+   */
+  public final JobModelManager jobModelManager;
+
+  public final int taskId;
+  /**
+   * Id of the AM container (as allocated by the RM)
+   * Used for displaying in the AM UI. Usage in {@link 
org.apache.samza.webapp.ApplicationMasterRestServlet}
+   * and scalate/WEB-INF/views/index.scaml
+   */
+  public final ContainerId amContainerId;
+  /**
+   * Host name of the NM on which the AM is running
+   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
+   */
+  public final String nodeHost;
+  /**
+   * NM port on which the AM is running
+   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
+   */
+  public final int nodePort;
+  /**
+   * Http port of the NM on which the AM is running
+   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
+   */
+  public final int nodeHttpPort;
+  /**
+   * Application Attempt Id as provided by Yarn
+   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
+   * and {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
+   */
+  public final ApplicationAttemptId appAttemptId;
+
+  //TODO: Make the below 3 variables immutable. Tracked as a part of 
SAMZA-902. Save for later.
+  /**
+   * Job Coordinator URL
+   * Usage in {@link org.apache.samza.job.yarn.SamzaAppMasterService} &amp; 
YarnContainerRunner
+   */
+  public URL coordinatorUrl = null;
+  /**
+   * URL of the {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
+   */
+  public URL rpcUrl = null;
+  /**
+   * URL of the {@link org.apache.samza.webapp.ApplicationMasterWebServlet}
+   */
+  public URL trackingUrl = null;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
new file mode 100644
index 0000000..7778a38
--- /dev/null
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.clustermanager.*;
+import org.apache.samza.clustermanager.SamzaApplicationState;
+import org.apache.samza.clustermanager.SamzaContainerLaunchException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.YarnConfig;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.job.CommandBuilder;
+import org.apache.samza.job.yarn.YarnContainer;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.util.hadoop.HttpFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ * An {@link YarnClusterResourceManager} implements a ClusterResourceManager 
using Yarn as the underlying
+ * resource manager. This class is as an adaptor between Yarn and translates 
Yarn callbacks into
+ * Samza specific callback methods as specified in Callback.
+ *
+ * Thread-safety:
+ * 1.Start and stop methods should  NOT be called from multiple threads.
+ * 2.ALL callbacks from the YarnContainerManager are invoked from a single 
Callback thread of the AMRMClient.
+ * 3.Stop should not be called more than once.
+ *
+ */
+
+public class YarnClusterResourceManager extends ClusterResourceManager 
implements AMRMClientAsync.CallbackHandler {
+
+  private final int INVALID_YARN_CONTAINER_ID = -1;
+
+  /**
+   * The containerProcessManager instance to request resources from yarn.
+   */
+  private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
+
+  /**
+   * A helper class to launch Yarn containers.
+   */
+  private final YarnContainerRunner yarnContainerRunner;
+
+  /**
+   * Configuration and state specific to Yarn.
+   */
+  private final YarnConfiguration hConfig;
+  private final YarnAppState state;
+
+  /**
+   * SamzaYarnAppMasterLifecycle is responsible for registering, unregistering 
the AM client.
+   */
+  private final SamzaYarnAppMasterLifecycle lifecycle;
+
+  /**
+   * SamzaAppMasterService is responsible for hosting an AM web UI. This picks 
up data from both
+   * SamzaAppState and YarnAppState.
+   */
+  private final SamzaYarnAppMasterService service;
+
+
+  /**
+   * State variables to map Yarn specific callbacks into Samza specific 
callbacks.
+   */
+  private final ConcurrentHashMap<SamzaResource, Container> allocatedResources 
= new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<SamzaResourceRequest, 
AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap<>();
+
+  final AtomicBoolean started = new AtomicBoolean(false);
+  private final Object lock = new Object();
+
+  private static final Logger log = 
LoggerFactory.getLogger(YarnClusterResourceManager.class);
+
+  /**
+   * Creates an YarnClusterResourceManager from config, a jobModelReader and a 
callback.
+   * @param config to instantiate the container manager with
+   * @param jobModelManager the jobModel manager to get the job model (mostly 
for the UI)
+   * @param callback the callback to receive events from Yarn.
+   * @param samzaAppState samza app state for display in the UI
+   */
+  public YarnClusterResourceManager(Config config, JobModelManager 
jobModelManager, ClusterResourceManager.Callback callback, 
SamzaApplicationState samzaAppState ) {
+    super(callback);
+    hConfig = new YarnConfiguration();
+    hConfig.set("fs.http.impl", HttpFileSystem.class.getName());
+
+    MetricsRegistryMap registry = new MetricsRegistryMap();
+
+    // parse configs from the Yarn environment
+    String containerIdStr = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString());
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    String nodeHostString = 
System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
+    String nodePortString = 
System.getenv(ApplicationConstants.Environment.NM_PORT.toString());
+    String nodeHttpPortString = 
System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString());
+
+    int nodePort = Integer.parseInt(nodePortString);
+    int nodeHttpPort = Integer.parseInt(nodeHttpPortString);
+    YarnConfig yarnConfig = new YarnConfig(config);
+    int interval = yarnConfig.getAMPollIntervalMs();
+
+    //Instantiate the AM Client.
+    this.amClient = AMRMClientAsync.createAMRMClientAsync(interval, this);
+
+    this.state = new YarnAppState(jobModelManager, -1, containerId, 
nodeHostString, nodePort, nodeHttpPort, samzaAppState);
+
+    log.info("Initialized YarnAppState: {}", state.toString());
+    this.service = new SamzaYarnAppMasterService(config, this.state, registry);
+
+    log.info("ContainerID str {}, Nodehost  {} , Nodeport  {} , NodeHttpport 
{}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort});
+    this.lifecycle = new 
SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), 
yarnConfig.getContainerMaxCpuCores(), state, amClient );
+
+    yarnContainerRunner = new YarnContainerRunner(config, hConfig);
+  }
+
+  /**
+   * Starts the YarnContainerManager and initialize all its sub-systems.
+   * Attempting to start an already started container manager will return 
immediately.
+   */
+  @Override
+  public void start() {
+    if(!started.compareAndSet(false, true)) {
+      log.info("Attempting to start an already started ContainerManager");
+      return;
+    }
+    service.onInit();
+    log.info("Starting YarnContainerManager.");
+    amClient.init(hConfig);
+    amClient.start();
+    lifecycle.onInit();
+
+    if(lifecycle.shouldShutdown()) {
+      clusterManagerCallback.onError(new SamzaException("Invalid resource 
request."));
+    }
+
+    log.info("Finished starting YarnContainerManager");
+  }
+
+  /**
+   * Request resources for running container processes.
+   */
+  @Override
+  public void requestResources(SamzaResourceRequest resourceRequest) {
+    final int DEFAULT_PRIORITY = 0;
+    log.info("Requesting resources on  " + resourceRequest.getPreferredHost() 
+ " for container " + resourceRequest.getContainerID());
+
+    int memoryMb = resourceRequest.getMemoryMB();
+    int cpuCores = resourceRequest.getNumCores();
+    String preferredHost = resourceRequest.getPreferredHost();
+    Resource capability = Resource.newInstance(memoryMb, cpuCores);
+    Priority priority =  Priority.newInstance(DEFAULT_PRIORITY);
+
+    AMRMClient.ContainerRequest issuedRequest;
+
+    if (preferredHost.equals("ANY_HOST"))
+    {
+      log.info("Making a request for ANY_HOST " + preferredHost );
+      issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, 
priority);
+    }
+    else
+    {
+      log.info("Making a preferred host request on " + preferredHost);
+      issuedRequest = new AMRMClient.ContainerRequest(
+              capability,
+              new String[]{preferredHost},
+              null,
+              priority);
+    }
+    //ensure that updating the state and making the request are done 
atomically.
+    synchronized (lock) {
+      requestsMap.put(resourceRequest, issuedRequest);
+      amClient.addContainerRequest(issuedRequest);
+    }
+  }
+
+  /**
+   * Requests the YarnContainerManager to release a resource. If the app 
cannot use the resource or wants to give up
+   * the resource, it can release them.
+   *
+   * @param resource to be released
+   */
+
+  @Override
+  public void releaseResources(SamzaResource resource) {
+    log.info("Release resource invoked {} ", resource);
+    //ensure that updating state and removing the request are done atomically
+    synchronized (lock) {
+      Container container = allocatedResources.get(resource);
+      if (container == null) {
+        log.info("Resource {} already released. ", resource);
+        return;
+      }
+      amClient.releaseAssignedContainer(container.getId());
+      allocatedResources.remove(resource);
+    }
+  }
+
+  /**
+   *
+   * Requests the launch of a StreamProcessor with the specified ID on the 
resource
+   * @param resource , the SamzaResource on which to launch the StreamProcessor
+   * @param builder, the builder to build the resource launch command from
+   *
+   * TODO: Support non-builder methods to launch resources. Maybe, refactor 
into a ContainerLaunchStrategy interface
+   */
+
+  @Override
+  public void launchStreamProcessor(SamzaResource resource, CommandBuilder 
builder) throws SamzaContainerLaunchException {
+    String containerIDStr = 
builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
+    int containerID = Integer.parseInt(containerIDStr);
+    log.info("Received launch request for {} on hostname {}", containerID , 
resource.getHost());
+
+    synchronized (lock) {
+      Container container = allocatedResources.get(resource);
+      if (container == null) {
+        log.info("Resource {} already released. ", resource);
+        return;
+      }
+
+      state.runningYarnContainers.put(containerID, new 
YarnContainer(container));
+      yarnContainerRunner.runContainer(containerID, container, builder);
+    }
+  }
+
+  /**
+   * Given a lookupContainerId from Yarn (for example: containerId_app_12345, 
this method returns the SamzaContainer ID
+   * in the range [0,N-1] that maps to it.
+   * @param lookupContainerId  the Yarn container ID.
+   * @return  the samza container ID.
+   */
+
+  //TODO: Get rid of the YarnContainer object and just use Container in 
state.runningYarnContainers hashmap.
+  //In that case, this scan will turn into a lookup. This change will require 
changes/testing in the UI files because
+  //those UI stub templates operate on the YarnContainer object.
+
+  private int getIDForContainer(String lookupContainerId) {
+    int samzaContainerID = INVALID_YARN_CONTAINER_ID;
+    for(Map.Entry<Integer, YarnContainer> entry : 
state.runningYarnContainers.entrySet()) {
+      Integer key = entry.getKey();
+      YarnContainer yarnContainer = entry.getValue();
+      String yarnContainerId = yarnContainer.id().toString();
+      if(yarnContainerId.equals(lookupContainerId)) {
+        return key;
+      }
+    }
+    return samzaContainerID;
+  }
+
+
+  /**
+   *
+   * Remove a previously submitted resource request. The previous container 
request may have
+   * been submitted. Even after the remove request, a Callback implementation 
must
+   * be prepared to receive an allocation for the previous request. This is 
merely a best effort cancellation.
+   *
+   * @param request the request to be cancelled
+   */
+  @Override
+  public void cancelResourceRequest(SamzaResourceRequest request) {
+    log.info("Cancelling request {} ", request);
+    //ensure that removal and cancellation are done atomically.
+    synchronized (lock) {
+      AMRMClient.ContainerRequest containerRequest = requestsMap.get(request);
+      if (containerRequest == null) {
+        log.info("Cancellation of {} already done. ", containerRequest);
+        return;
+      }
+      requestsMap.remove(request);
+      amClient.removeContainerRequest(containerRequest);
+    }
+  }
+
+
+  /**
+   * Stops the YarnContainerManager and all its sub-components.
+   * Stop should NOT be called from multiple threads.
+   * TODO: fix this to make stop idempotent?.
+   */
+  @Override
+  public void stop(SamzaApplicationState.SamzaAppStatus status) {
+    log.info("Stopping AM client " );
+    lifecycle.onShutdown(status);
+    amClient.stop();
+    log.info("Stopping the AM service " );
+    service.onShutdown();
+  }
+
+  /**
+   * Callback invoked from Yarn when containers complete. This translates the 
yarn callbacks into Samza specific
+   * ones.
+   *
+   * @param statuses the YarnContainerStatus callbacks from Yarn.
+   */
+  @Override
+  public void onContainersCompleted(List<ContainerStatus> statuses) {
+    List<SamzaResourceStatus> samzaResrcStatuses = new ArrayList<>();
+
+    for(ContainerStatus status: statuses) {
+      log.info("Container completed from RM " + status);
+
+      SamzaResourceStatus samzaResrcStatus = new 
SamzaResourceStatus(status.getContainerId().toString(), 
status.getDiagnostics(), status.getExitStatus());
+      samzaResrcStatuses.add(samzaResrcStatus);
+
+      int completedContainerID = 
getIDForContainer(status.getContainerId().toString());
+      log.info("Completed container had ID: {}", completedContainerID);
+
+      //remove the container from the list of running containers, if failed 
with a non-zero exit code, add it to the list of
+      //failed containers.
+      if(completedContainerID != INVALID_YARN_CONTAINER_ID){
+        if(state.runningYarnContainers.containsKey(completedContainerID)) {
+          log.info("Removing container ID {} from completed containers", 
completedContainerID);
+          state.runningYarnContainers.remove(completedContainerID);
+
+          if(status.getExitStatus() != ContainerExitStatus.SUCCESS)
+            
state.failedContainersStatus.put(status.getContainerId().toString(), status);
+        }
+      }
+    }
+    clusterManagerCallback.onResourcesCompleted(samzaResrcStatuses);
+  }
+
+  /**
+   * Callback invoked from Yarn when containers are allocated. This translates 
the yarn callbacks into Samza
+   * specific ones.
+   * @param containers the list of {@link Container} returned by Yarn.
+   */
+  @Override
+  public void onContainersAllocated(List<Container> containers) {
+      List<SamzaResource> resources = new ArrayList<SamzaResource>();
+      for(Container container : containers) {
+          log.info("Container allocated from RM on " + 
container.getNodeId().getHost());
+          final String id = container.getId().toString();
+          String host = container.getNodeId().getHost();
+          int memory = container.getResource().getMemory();
+          int numCores = container.getResource().getVirtualCores();
+
+          SamzaResource resource = new SamzaResource(numCores, memory, host, 
id);
+          allocatedResources.put(resource, container);
+          resources.add(resource);
+      }
+      clusterManagerCallback.onResourcesAvailable(resources);
+  }
+
+  //The below methods are specific to the Yarn AMRM Client. We currently don't 
handle scenarios where there are
+  //nodes being updated. We always return 0 when asked for progress by Yarn.
+  @Override
+  public void onShutdownRequest() {
+    //not implemented currently.
+  }
+
+  @Override
+  public void onNodesUpdated(List<NodeReport> updatedNodes) {
+    //not implemented currently.
+  }
+
+  @Override
+  public float getProgress() {
+    //not implemented currently.
+    return 0;
+  }
+
+  /**
+   * Callback invoked when there is an error in the Yarn client. This 
delegates the
+   * callback handling to the {@link ClusterResourceManager.Callback} instance.
+   *
+   */
+  @Override
+  public void onError(Throwable e) {
+    log.error("Exception in the Yarn callback {}", e);
+    clusterManagerCallback.onError(e);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
new file mode 100644
index 0000000..dacc52d
--- /dev/null
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.samza.clustermanager.SamzaContainerLaunchException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.YarnConfig;
+import org.apache.samza.job.CommandBuilder;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * A Helper class to run container processes on Yarn. This encapsulates quite 
a bit of YarnContainer
+ * boiler plate.
+ */
+public class YarnContainerRunner {
+  private static final Logger log = 
LoggerFactory.getLogger(YarnContainerRunner.class);
+
+  private final Config config;
+  private final YarnConfiguration yarnConfiguration;
+
+  private final NMClient nmClient;
+  private final YarnConfig yarnConfig;
+
+  /**
+   * Create a new Runner from a Config.
+   * @param config to instantiate the runner with
+   * @param yarnConfiguration the yarn config for the cluster to connect to.
+   */
+
+  public YarnContainerRunner(Config config,
+                             YarnConfiguration yarnConfiguration) {
+    this.config = config;
+    this.yarnConfiguration = yarnConfiguration;
+
+    this.nmClient = NMClient.createNMClient();
+    nmClient.init(this.yarnConfiguration);
+
+    this.yarnConfig = new YarnConfig(config);
+  }
+
+  /**
+   * Runs a process as specified by the command builder on the container.
+   * @param samzaContainerId id of the samza Container to run (passed as a 
command line parameter to the process)
+   * @param container the samza container to run.
+   * @param cmdBuilder the command builder that encapsulates the command, and 
the context
+   *
+   * @throws SamzaContainerLaunchException  when there's an exception in 
submitting the request to the RM.
+   *
+   */
+  public void runContainer(int samzaContainerId, Container container, 
CommandBuilder cmdBuilder) throws SamzaContainerLaunchException {
+    String containerIdStr = ConverterUtils.toString(container.getId());
+    log.info("Got available container ID ({}) for container: {}", 
samzaContainerId, container);
+
+    // check if we have framework path specified. If yes - use it, if not use 
default ./__package/
+    String jobLib = ""; // in case of separate framework, this directory will 
point at the job's libraries
+    String cmdPath = "./__package/";
+
+    String fwkPath = JobConfig.getFwkPath(config);
+    if(fwkPath != null && (! fwkPath.isEmpty())) {
+      cmdPath = fwkPath;
+      jobLib = "export JOB_LIB_DIR=./__package/lib";
+    }
+    log.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + 
cmdPath + ";jobLib=" + jobLib);
+    cmdBuilder.setCommandPath(cmdPath);
+
+
+    String command = cmdBuilder.buildCommand();
+    log.info("Container ID {} using command {}", samzaContainerId, command);
+
+    Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
+    printContainerEnvironmentVariables(samzaContainerId, env);
+
+    log.info("Samza FWK path: " + command + "; env=" + env);
+
+    Path path = new Path(yarnConfig.getPackagePath());
+    log.info("Starting container ID {} using package path {}", 
samzaContainerId, path);
+
+    startContainer(
+        path,
+        container,
+        env,
+        getFormattedCommand(
+            ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+            jobLib,
+            command,
+            ApplicationConstants.STDOUT,
+            ApplicationConstants.STDERR)
+    );
+
+
+    log.info("Claimed container ID {} for container {} on node {} 
(http://{}/node/containerlogs/{}).",
+        new Object[]{
+            samzaContainerId,
+            containerIdStr,
+            container.getNodeId().getHost(),
+            container.getNodeHttpAddress(),
+            containerIdStr}
+    );
+
+      log.info("Started container ID {}", samzaContainerId);
+  }
+
+  /**
+   *    Runs a command as a process on the container. All binaries needed by 
the physical process are packaged in the URL
+   *    specified by packagePath.
+   */
+  private void startContainer(Path packagePath,
+                                Container container,
+                                Map<String, String> env,
+                                final String cmd) throws 
SamzaContainerLaunchException {
+    log.info("starting container {} {} {} {}",
+        new Object[]{packagePath, container, env, cmd});
+
+    // set the local package so that the containers and app master are 
provisioned with it
+    LocalResource packageResource = Records.newRecord(LocalResource.class);
+    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
+    FileStatus fileStatus;
+    try {
+      fileStatus = 
packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
+    } catch (IOException ioe) {
+      log.error("IO Exception when accessing the package status from the 
filesystem", ioe);
+      throw new SamzaContainerLaunchException("IO Exception when accessing the 
package status from the filesystem");
+    }
+
+    packageResource.setResource(packageUrl);
+    packageResource.setSize(fileStatus.getLen());
+    packageResource.setTimestamp(fileStatus.getModificationTime());
+    packageResource.setType(LocalResourceType.ARCHIVE);
+    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+    ByteBuffer allTokens;
+    // copy tokens (copied from dist shell example)
+    try {
+      Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
+      DataOutputBuffer dob = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(dob);
+
+      // now remove the AM->RM token so that containers cannot access it
+      Iterator iter = credentials.getAllTokens().iterator();
+      while (iter.hasNext()) {
+        TokenIdentifier token = ((Token) iter.next()).decodeIdentifier();
+        if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+          iter.remove();
+        }
+      }
+      allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    } catch (IOException ioe) {
+      log.error("IOException when writing credentials.", ioe);
+      throw new SamzaContainerLaunchException("IO Exception when writing 
credentials to output buffer");
+    }
+
+    ContainerLaunchContext context = 
Records.newRecord(ContainerLaunchContext.class);
+    context.setEnvironment(env);
+    context.setTokens(allTokens.duplicate());
+    context.setCommands(new ArrayList<String>() {{add(cmd);}});
+    context.setLocalResources(Collections.singletonMap("__package", 
packageResource));
+
+    log.debug("setting package to {}", packageResource);
+    log.debug("setting context to {}", context);
+
+    StartContainerRequest startContainerRequest = 
Records.newRecord(StartContainerRequest.class);
+    startContainerRequest.setContainerLaunchContext(context);
+    try {
+      nmClient.startContainer(container, context);
+    } catch (YarnException ye) {
+      log.error("Received YarnException when starting container: " + 
container.getId(), ye);
+      throw new SamzaContainerLaunchException("Received YarnException when 
starting container: " + container.getId(), ye);
+    } catch (IOException ioe) {
+      log.error("Received IOException when starting container: " + 
container.getId(), ioe);
+      throw new SamzaContainerLaunchException("Received IOException when 
starting container: " + container.getId(), ioe);
+    }
+  }
+
+
+  /**
+   * @param samzaContainerId  the Samza container Id for logging purposes.
+   * @param env               the Map of environment variables to their 
respective values.
+   */
+  private void printContainerEnvironmentVariables(int samzaContainerId, 
Map<String, String> env) {
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<String, String> entry : env.entrySet()) {
+      sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));
+    }
+    log.info("Container ID {} using environment variables: {}", 
samzaContainerId, sb.toString());
+  }
+
+
+  /**
+   * Gets the environment variables from the specified {@link CommandBuilder} 
and escapes certain characters.
+   *
+   * @param cmdBuilder        the command builder containing the environment 
variables.
+   * @return                  the map containing the escaped environment 
variables.
+   */
+  private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder 
cmdBuilder) {
+    Map<String, String> env = new HashMap<String, String>();
+    for (Map.Entry<String, String> entry : 
cmdBuilder.buildEnvironment().entrySet()) {
+      String escapedValue = Util.envVarEscape(entry.getValue());
+      env.put(entry.getKey(), escapedValue);
+    }
+    return env;
+  }
+
+
+  private String getFormattedCommand(String logDirExpansionVar,
+                                     String jobLib,
+                                     String command,
+                                     String stdOut,
+                                     String stdErr) {
+    if (!jobLib.isEmpty()) {
+      jobLib = "&& " + jobLib; // add job's libraries exported to an env 
variable
+    }
+
+    return String
+        .format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 
1>logs/%s 2>logs/%s", logDirExpansionVar,
+            jobLib, logDirExpansionVar, command, stdOut, stdErr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java
new file mode 100644
index 0000000..988a8e8
--- /dev/null
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn;
+
+import org.apache.samza.clustermanager.ClusterResourceManager;
+import org.apache.samza.clustermanager.ResourceManagerFactory;
+import org.apache.samza.clustermanager.SamzaApplicationState;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.JobModelManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A YarnContainerProcessManagerFactory returns an implementation of a {@link 
ClusterResourceManager} for Yarn.
+ */
+public class YarnResourceManagerFactory implements ResourceManagerFactory {
+
+  private static Logger log = 
LoggerFactory.getLogger(YarnResourceManagerFactory.class);
+
+  @Override
+  public ClusterResourceManager 
getClusterResourceManager(ClusterResourceManager.Callback callback, 
SamzaApplicationState state) {
+    log.info("Creating an instance of a cluster resource manager for Yarn. ");
+    JobModelManager jobModelManager = state.jobModelManager;
+    Config config = jobModelManager.jobModel().getConfig();
+    YarnClusterResourceManager manager = new 
YarnClusterResourceManager(config, jobModelManager, callback, state);
+    return manager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index 70f1e4f..c47e8d1 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -35,8 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.container.SamzaContainerMetrics;
-import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.yarn.ClientHelper;
 import org.apache.samza.metrics.JmxMetricsAccessor;
@@ -149,9 +148,9 @@ public class YarnJobValidationTool {
   }
 
   public void validateJmxMetrics() throws Exception {
-    JobCoordinator jobCoordinator = JobCoordinator.apply(config);
+    JobModelManager jobModelManager = JobModelManager.apply(config);
     validator.init(config);
-    Map<Integer, String> jmxUrls = 
jobCoordinator.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
+    Map<Integer, String> jmxUrls = 
jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
     for (Map.Entry<Integer, String> entry : jmxUrls.entrySet()) {
       Integer containerId = entry.getKey();
       String jmxUrl = entry.getValue();

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index 80deb3b..7bd8131 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -36,7 +36,7 @@ import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.util.hadoop.HttpFileSystem
 import org.apache.samza.util.Logging
 import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.SamzaException
 
 /**
@@ -71,7 +71,7 @@ object SamzaAppMaster extends Logging with 
AMRMClientAsync.CallbackHandler {
     val coordinatorSystemConfig = new 
MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG),
 classOf[Config]))
     info("got coordinator system config: %s" format coordinatorSystemConfig)
     val registry = new MetricsRegistryMap
-    val jobCoordinator = JobCoordinator(coordinatorSystemConfig, registry)
+    val jobCoordinator = JobModelManager(coordinatorSystemConfig, registry)
     val config = jobCoordinator.jobModel.getConfig
     val yarnConfig = new YarnConfig(config)
     info("got config: %s" format coordinatorSystemConfig)

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
new file mode 100644
index 0000000..2ed9baf
--- /dev/null
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
+import org.apache.samza.SamzaException
+import org.apache.samza.clustermanager.SamzaApplicationState
+import SamzaApplicationState.SamzaAppStatus
+import org.apache.samza.util.Logging
+
+/**
+ * Responsible for managing the lifecycle of the Yarn application master. 
Mostly,
+ * this means registering and unregistering with the RM, and shutting down
+ * when the RM tells us to Reboot.
+ */
+//This class is used in the refactored code path as called by run-jc.sh
+class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: 
YarnAppState, amClient: AMRMClientAsync[ContainerRequest]) extends Logging {
+  var validResourceRequest = true
+  var shutdownMessage: String = null
+  var webApp: SamzaYarnAppMasterService = null
+  def onInit() {
+    val host = state.nodeHost
+    val response = amClient.registerApplicationMaster(host, 
state.rpcUrl.getPort, "%s:%d" format (host, state.trackingUrl.getPort))
+
+    // validate that the YARN cluster can handle our container resource 
requirements
+    val maxCapability = response.getMaximumResourceCapability
+    val maxMem = maxCapability.getMemory
+    val maxCpu = maxCapability.getVirtualCores
+    info("Got AM register response. The YARN RM supports container requests 
with max-mem: %s, max-cpu: %s" format (maxMem, maxCpu))
+
+    if (containerMem > maxMem || containerCpu > maxCpu) {
+      shutdownMessage = "The YARN cluster is unable to run your job due to 
unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." 
format (containerMem, containerCpu)
+      error(shutdownMessage)
+      validResourceRequest = false
+      state.samzaAppState.status = SamzaAppStatus.FAILED;
+      state.samzaAppState.jobHealthy.set(false)
+    }
+  }
+
+  def onReboot() {
+    throw new SamzaException("Received a reboot signal from the RM, so 
throwing an exception to reboot the AM.")
+  }
+
+  def onShutdown(samzaAppStatus: SamzaAppStatus) {
+    val yarnStatus: FinalApplicationStatus = getStatus(samzaAppStatus)
+    info("Shutting down SamzaAppStatus: " + samzaAppStatus + " yarn status: " 
+ yarnStatus)
+    //The value of state.status is set to either SUCCEEDED or FAILED for 
errors we catch and handle - like container failures
+    //All other AM failures (errors in callbacks/connection failures after 
retries/token expirations) should not unregister the AM,
+    //allowing the RM to restart it (potentially on a different host)
+    if(samzaAppStatus != SamzaAppStatus.UNDEFINED) {
+      info("Unregistering AM from the RM.")
+      amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
+      info("Unregister complete.")
+    }
+    else {
+      info("Not unregistering AM from the RM. This will enable RM retries")
+    }
+  }
+
+  def getStatus(samzaAppStatus: SamzaAppStatus): FinalApplicationStatus = {
+    if (samzaAppStatus == SamzaAppStatus.FAILED)
+       return FinalApplicationStatus.FAILED
+    if(samzaAppStatus == SamzaAppStatus.SUCCEEDED)
+       return FinalApplicationStatus.SUCCEEDED
+
+   return FinalApplicationStatus.UNDEFINED
+  }
+
+
+  def shouldShutdown = !validResourceRequest
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
new file mode 100644
index 0000000..f62bec1
--- /dev/null
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import org.apache.samza.config.Config
+import org.apache.samza.coordinator.server.HttpServer
+import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
+import org.apache.samza.coordinator.stream.messages.SetConfig
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.util.Logging
+
+/**
+ * Samza's application master runs a very basic HTTP/JSON service to allow
+ * dashboards to check on the status of a job. SamzaAppMasterService starts
+ * up the web service when initialized.
+ */
+//This class is used in the refactored code path as called by run-jc.sh
+
+class SamzaYarnAppMasterService(config: Config, state: YarnAppState, registry: 
ReadableMetricsRegistry) extends  Logging {
+  var rpcApp: HttpServer = null
+  var webApp: HttpServer = null
+  val SERVER_URL_OPT: String = "samza.autoscaling.server.url"
+
+   def onInit() {
+    // try starting the samza AM dashboard at a random rpc and tracking port
+    info("Starting webapp at a random rpc and tracking port")
+
+    rpcApp = new HttpServer(resourceBasePath = "scalate")
+    //TODO: Since the state has changed into Samza specific and Yarn specific 
states, this UI has to be refactored too.
+    //rpcApp.addServlet("/*", refactor ApplicationMasterRestServlet(config, 
state, registry))
+    rpcApp.start
+
+    webApp = new HttpServer(resourceBasePath = "scalate")
+    //webApp.addServlet("/*", refactor ApplicationMasterWebServlet(config, 
state))
+    webApp.start
+
+    state.jobModelManager.start
+    state.rpcUrl = rpcApp.getUrl
+    state.trackingUrl = webApp.getUrl
+    state.coordinatorUrl = state.jobModelManager.server.getUrl
+
+    //write server url to coordinator stream
+    val coordinatorStreamWriter: CoordinatorStreamWriter = new 
CoordinatorStreamWriter(config)
+    coordinatorStreamWriter.start()
+    coordinatorStreamWriter.sendMessage(SetConfig.TYPE, SERVER_URL_OPT, 
state.coordinatorUrl.toString)
+    coordinatorStreamWriter.stop()
+    debug("Sent server url message with value: %s " format 
state.coordinatorUrl.toString)
+
+    info("Webapp is started at (rpc %s, tracking %s, coordinator %s)" 
format(state.rpcUrl, state.trackingUrl, state.coordinatorUrl))
+  }
+
+   def onShutdown() {
+    if (rpcApp != null) {
+      rpcApp.stop
+    }
+
+    if (webApp != null) {
+      webApp.stop
+    }
+
+    state.jobModelManager.stop
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
index 5badd29..0bbd48d 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.YarnConfig;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
@@ -77,14 +77,14 @@ public abstract class TestContainerAllocatorCommon {
   protected abstract Config getConfig();
   protected abstract MockContainerRequestState 
createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> 
amClient);
 
-  private JobCoordinator getCoordinator(int containerCount) {
+  private JobModelManager getCoordinator(int containerCount) {
     Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
     for (int i = 0; i < containerCount; i++) {
       ContainerModel container = new ContainerModel(i, new HashMap<TaskName, 
TaskModel>());
       containers.put(i, container);
     }
     JobModel jobModel = new JobModel(getConfig(), containers);
-    return new JobCoordinator(jobModel, server, null);
+    return new JobModelManager(jobModel, server, null);
   }
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
index faa697d..d747b81 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
@@ -33,7 +33,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.YarnConfig;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.model.ContainerModel;
@@ -98,7 +98,7 @@ public class TestSamzaTaskManager {
   private SamzaAppState state = null;
   private HttpServer server = null;
 
-  private JobCoordinator getCoordinator(int containerCount) {
+  private JobModelManager getCoordinator(int containerCount) {
     Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
     for (int i = 0; i < containerCount; i++) {
       ContainerModel container = new ContainerModel(i, new HashMap<TaskName, 
TaskModel>());
@@ -114,8 +114,8 @@ public class TestSamzaTaskManager {
     when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
 
     JobModel jobModel = new JobModel(getConfig(), containers, 
mockLocalityManager);
-    JobCoordinator.jobModelRef().getAndSet(jobModel);
-    return new JobCoordinator(jobModel, server, null);
+    JobModelManager.jobModelRef().getAndSet(jobModel);
+    return new JobModelManager(jobModel, server, null);
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
index 30cf34f..750f467 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
@@ -34,10 +34,10 @@ import org.junit.Assert._
 import org.junit.Test
 import org.mockito.Mockito
 import java.net.URL
-import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.JobModelManager
 
 class TestSamzaAppMasterLifecycle {
-  val coordinator = new JobCoordinator(null, null)
+  val coordinator = new JobModelManager(null, null)
   val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, 
Mockito.mock(classOf[CallbackHandler])) {
     var host = ""
     var port = 0

http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
index 7f5d9f4..fc0091f 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
@@ -34,7 +34,7 @@ import scala.collection.JavaConversions._
 import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.container.TaskName
-import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
 
 class TestSamzaAppMasterService {
@@ -42,7 +42,7 @@ class TestSamzaAppMasterService {
   @Test
   def testAppMasterDashboardShouldStart {
     val config = getDummyConfig
-    val state = new SamzaAppState(JobCoordinator(config), -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 
2)
+    val state = new SamzaAppState(JobModelManager(config), -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 
2)
     val service = new SamzaAppMasterService(config, state, null, null)
     val taskName = new TaskName("test")
 
@@ -73,7 +73,7 @@ class TestSamzaAppMasterService {
   def testAppMasterDashboardWebServiceShouldStart {
     // Create some dummy config
     val config = getDummyConfig
-    val state = new SamzaAppState(JobCoordinator(config), -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 
2)
+    val state = new SamzaAppState(JobModelManager(config), -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 
2)
     val service = new SamzaAppMasterService(config, state, null, null)
 
     // start the dashboard

Reply via email to