Repository: samza
Updated Branches:
  refs/heads/master 920f803a2 -> 9396ee5cc


http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
deleted file mode 100644
index d747b81..0000000
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
+++ /dev/null
@@ -1,502 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.job.yarn.util.MockContainerAllocator;
-import org.apache.samza.job.yarn.util.MockHttpServer;
-import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
-import org.apache.samza.job.yarn.util.TestUtil;
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestSamzaTaskManager {
-  private AMRMClientAsyncImpl amRmClientAsync;
-  private TestAMRMClientImpl testAMRMClient;
-
-  private static volatile boolean isRunning = false;
-
-  private Map<String, String> configVals = new HashMap<String, String>()  {
-    {
-      put("yarn.container.count", "1");
-      put("systems.test-system.samza.factory", 
"org.apache.samza.job.yarn.MockSystemFactory");
-      put("yarn.container.memory.mb", "512");
-      put("yarn.package.path", "/foo");
-      put("task.inputs", "test-system.test-stream");
-      put("systems.test-system.samza.key.serde", 
"org.apache.samza.serializers.JsonSerde");
-      put("systems.test-system.samza.msg.serde", 
"org.apache.samza.serializers.JsonSerde");
-      put("yarn.container.retry.count", "1");
-      put("yarn.container.retry.window.ms", "1999999999");
-      put("yarn.allocator.sleep.ms", "1");
-      put("yarn.container.request.timeout.ms", "2");
-    }
-  };
-  private Config config = new MapConfig(configVals);
-
-  private Config getConfig() {
-    Map<String, String> map = new HashMap<>();
-    map.putAll(config);
-    return new MapConfig(map);
-  }
-
-  private Config getConfigWithHostAffinity() {
-    Map<String, String> map = new HashMap<>();
-    map.putAll(config);
-    map.put("yarn.samza.host-affinity.enabled", "true");
-    return new MapConfig(map);
-  }
-
-  private SamzaAppState state = null;
-  private HttpServer server = null;
-
-  private JobModelManager getCoordinator(int containerCount) {
-    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
-    for (int i = 0; i < containerCount; i++) {
-      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, 
TaskModel>());
-      containers.put(i, container);
-    }
-    Map<Integer, Map<String, String>> localityMap = new HashMap<>();
-    localityMap.put(0, new HashMap<String, String>(){
-      {
-        put(SetContainerHostMapping.HOST_KEY, "abc");
-      }
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-
-    JobModel jobModel = new JobModel(getConfig(), containers, 
mockLocalityManager);
-    JobModelManager.jobModelRef().getAndSet(jobModel);
-    return new JobModelManager(jobModel, server, null);
-  }
-
-  @Before
-  public void setup() throws Exception {
-    // Create AMRMClient
-    testAMRMClient = new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-            false,
-            new ArrayList<Container>(),
-            new ArrayList<ContainerStatus>()
-        ));
-    amRmClientAsync = TestUtil.getAMClient(testAMRMClient);
-
-    server = new MockHttpServer("/", 7777, null, new 
ServletHolder(DefaultServlet.class));
-
-    // Initialize coordinator url
-    state = new SamzaAppState(getCoordinator(1), -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 
2);
-    state.coordinatorUrl = new URL("http://localhost:1234";);
-  }
-
-  @After
-  public void teardown() {
-    server.stop();
-  }
-
-  private Field getPrivateFieldFromTaskManager(String fieldName, 
SamzaTaskManager object) throws Exception {
-    Field field = object.getClass().getDeclaredField(fieldName);
-    field.setAccessible(true);
-    return field;
-  }
-
-  @Test
-  public void testSamzaTaskManager() throws Exception {
-    Map<String, String> conf = new HashMap<>();
-    conf.putAll(getConfig());
-    conf.put("yarn.container.memory.mb", "500");
-    conf.put("yarn.container.cpu.cores", "5");
-
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        new MapConfig(conf),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-        );
-
-    AbstractContainerAllocator allocator = (AbstractContainerAllocator) 
getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).get(taskManager);
-    assertEquals(ContainerAllocator.class, allocator.getClass());
-    // Asserts that samza exposed container configs is honored by allocator 
thread
-    assertEquals(500, allocator.containerMaxMemoryMb);
-    assertEquals(5, allocator.containerMaxCpuCore);
-
-    conf.clear();
-    conf.putAll(getConfigWithHostAffinity());
-    conf.put("yarn.container.memory.mb", "500");
-    conf.put("yarn.container.cpu.cores", "5");
-
-    taskManager = new SamzaTaskManager(
-        new MapConfig(conf),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-
-    allocator = (AbstractContainerAllocator) 
getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).get(taskManager);
-    assertEquals(HostAwareContainerAllocator.class, allocator.getClass());
-    // Asserts that samza exposed container configs is honored by allocator 
thread
-    assertEquals(500, allocator.containerMaxMemoryMb);
-    assertEquals(5, allocator.containerMaxCpuCore);
-  }
-
-  @Test
-  public void testContainerConfigsAreHonoredInAllocator() {
-
-  }
-
-  @Test
-  public void testOnInit() throws Exception {
-    Config conf = getConfig();
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        conf,
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-
-    MockContainerAllocator allocator = new MockContainerAllocator(
-        amRmClientAsync,
-        TestUtil.getContainerUtil(getConfig(), state),
-        new YarnConfig(conf));
-    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
-
-    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, new Thread() {
-      public void run() {
-        isRunning = true;
-      }
-    });
-
-    taskManager.onInit();
-    Thread.sleep(1000);
-
-    // Verify Allocator thread has started running
-    assertTrue(isRunning);
-
-    // Verify the remaining state
-    assertEquals(1, state.neededContainers.get());
-    assertEquals(1, allocator.requestedContainers);
-
-    taskManager.onShutdown();
-  }
-
-  @Test
-  public void testOnShutdown() throws Exception {
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        getConfig(),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-    taskManager.onInit();
-
-    Thread.sleep(100);
-
-    Thread allocatorThread = (Thread) 
getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager);
-    assertTrue(allocatorThread.isAlive());
-
-    taskManager.onShutdown();
-
-    Thread.sleep(100);
-    assertFalse(allocatorThread.isAlive());
-
-  }
-
-  /**
-   * Test Task Manager should stop when all containers finish
-   */
-  @Test
-  public void testTaskManagerShouldStopWhenContainersFinish() {
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        getConfig(),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-
-    taskManager.onInit();
-
-    assertFalse(taskManager.shouldShutdown());
-
-    
taskManager.onContainerCompleted(TestUtil.getContainerStatus(state.amContainerId,
 ContainerExitStatus.SUCCESS, ""));
-
-    assertTrue(taskManager.shouldShutdown());
-  }
-
-  /**
-   * Test Task Manager should request a new container when a task fails with 
unknown exit code
-   * When host-affinity is not enabled, it will always request for ANY_HOST
-   */
-  @Test
-  public void testNewContainerRequestedOnFailureWithUnknownCode() throws 
Exception {
-    Config conf = getConfig();
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        conf,
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-    MockContainerAllocator allocator = new MockContainerAllocator(
-        amRmClientAsync,
-        TestUtil.getContainerUtil(getConfig(), state),
-        new YarnConfig(conf));
-    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
-
-    Thread thread = new Thread(allocator);
-    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, thread);
-
-    // onInit triggers a request
-    taskManager.onInit();
-
-    assertFalse(taskManager.shouldShutdown());
-    assertEquals(1, 
allocator.getContainerRequestState().getRequestsQueue().size());
-
-    Container container = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
 "abc", 123);
-    taskManager.onContainerAllocated(container);
-
-    // Allow container to run and update state
-    Thread.sleep(300);
-
-    // Create first container failure
-    
taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 
1, "Expecting a failure here"));
-
-    // The above failure should trigger a container request
-    assertEquals(1, 
allocator.getContainerRequestState().getRequestsQueue().size());
-    assertEquals(ContainerRequestState.ANY_HOST, 
allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost());
-    assertFalse(taskManager.shouldShutdown());
-    assertFalse(state.jobHealthy.get());
-    assertEquals(2, testAMRMClient.requests.size());
-    assertEquals(0, testAMRMClient.getRelease().size());
-
-    taskManager.onContainerAllocated(container);
-
-    // Allow container to run and update state
-    Thread.sleep(300);
-
-    assertTrue(state.jobHealthy.get());
-
-    // Create a second failure
-    
taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 
1, "Expecting a failure here"));
-
-    // The above failure should trigger a job shutdown because our retry count 
is set to 1
-    assertEquals(0, 
allocator.getContainerRequestState().getRequestsQueue().size());
-    assertEquals(2, testAMRMClient.requests.size());
-    assertEquals(0, testAMRMClient.getRelease().size());
-    assertFalse(state.jobHealthy.get());
-    assertTrue(taskManager.shouldShutdown());
-    assertEquals(FinalApplicationStatus.FAILED, state.status);
-
-    taskManager.onShutdown();
-  }
-
-  /**
-   * Test Task Manager should request a new container when a task fails with 
unknown exit code
-   * When host-affinity is enabled, it will always request for the same host 
that it was last seen on
-   */
-  @Test
-  public void testSameContainerRequestedOnFailureWithUnknownCode() throws 
Exception {
-    Config conf = getConfigWithHostAffinity();
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        conf,
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-    MockContainerAllocator allocator = new MockContainerAllocator(
-        amRmClientAsync,
-        TestUtil.getContainerUtil(getConfig(), state),
-        new YarnConfig(conf));
-    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
-
-    Thread thread = new Thread(allocator);
-    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, thread);
-
-    // onInit triggers a request
-    taskManager.onInit();
-
-    assertFalse(taskManager.shouldShutdown());
-    assertEquals(1, 
allocator.getContainerRequestState().getRequestsQueue().size());
-
-    Container container = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
 "abc", 123);
-    taskManager.onContainerAllocated(container);
-
-    // Allow container to run and update state
-    Thread.sleep(300);
-
-    // Create first container failure
-    
taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 
1, "Expecting a failure here"));
-
-    // The above failure should trigger a container request
-    assertEquals(1, 
allocator.getContainerRequestState().getRequestsQueue().size());
-    assertEquals("abc", 
allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost());
-    assertFalse(taskManager.shouldShutdown());
-    assertFalse(state.jobHealthy.get());
-    assertEquals(2, testAMRMClient.requests.size());
-    assertEquals(0, testAMRMClient.getRelease().size());
-
-    taskManager.onContainerAllocated(container);
-
-    // Allow container to run and update state
-    Thread.sleep(300);
-
-    assertTrue(state.jobHealthy.get());
-
-    // Create a second failure
-    
taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 
1, "Expecting a failure here"));
-
-    // The above failure should trigger a job shutdown because our retry count 
is set to 1
-    assertEquals(0, 
allocator.getContainerRequestState().getRequestsQueue().size());
-    assertEquals(2, testAMRMClient.requests.size());
-    assertEquals(0, testAMRMClient.getRelease().size());
-    assertFalse(state.jobHealthy.get());
-    assertTrue(taskManager.shouldShutdown());
-    assertEquals(FinalApplicationStatus.FAILED, state.status);
-
-    taskManager.onShutdown();
-  }
-
-  /**
-   * Test AM requests a new container when a task fails
-   * Error codes with same behavior - Disk failure, preemption and aborted
-   */
-  @Test
-  public void testNewContainerRequestedOnFailureWithKnownCode() throws 
Exception {
-    Map<String, String> config = new HashMap<>();
-    config.putAll(getConfig());
-    config.remove("yarn.container.retry.count");
-
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        new MapConfig(config),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-    MockContainerAllocator allocator = new MockContainerAllocator(
-        amRmClientAsync,
-        TestUtil.getContainerUtil(getConfig(), state),
-        new YarnConfig(new MapConfig(config)));
-    getPrivateFieldFromTaskManager("containerAllocator", 
taskManager).set(taskManager, allocator);
-
-    Thread thread = new Thread(allocator);
-    getPrivateFieldFromTaskManager("allocatorThread", 
taskManager).set(taskManager, thread);
-
-    // Start the task manager
-    taskManager.onInit();
-    assertFalse(taskManager.shouldShutdown());
-    assertEquals(1, 
allocator.getContainerRequestState().getRequestsQueue().size());
-
-    Container container = 
TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
 "abc", 123);
-    taskManager.onContainerAllocated(container);
-
-    // Allow container to run and update state
-    Thread.sleep(300);
-
-    // Create container failure - with ContainerExitStatus.DISKS_FAILED
-    
taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 
ContainerExitStatus.DISKS_FAILED, "Disk failure"));
-
-    // The above failure should trigger a container request
-    assertEquals(1, 
allocator.getContainerRequestState().getRequestsQueue().size());
-    assertFalse(taskManager.shouldShutdown());
-    assertFalse(state.jobHealthy.get());
-    assertEquals(2, testAMRMClient.requests.size());
-    assertEquals(0, testAMRMClient.getRelease().size());
-    assertEquals(ContainerRequestState.ANY_HOST, 
allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost());
-
-    // Create container failure - with ContainerExitStatus.PREEMPTED
-    
taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 
ContainerExitStatus.PREEMPTED, "Task Preempted by RM"));
-
-    // The above failure should trigger a container request
-    assertEquals(1, 
allocator.getContainerRequestState().getRequestsQueue().size());
-    assertFalse(taskManager.shouldShutdown());
-    assertFalse(state.jobHealthy.get());
-    assertEquals(ContainerRequestState.ANY_HOST, 
allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost());
-
-    // Create container failure - with ContainerExitStatus.ABORTED
-    
taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 
ContainerExitStatus.ABORTED, "Task Aborted by the NM"));
-
-    // The above failure should trigger a container request
-    assertEquals(1, 
allocator.getContainerRequestState().getRequestsQueue().size());
-    assertEquals(2, testAMRMClient.requests.size());
-    assertEquals(0, testAMRMClient.getRelease().size());
-    assertFalse(taskManager.shouldShutdown());
-    assertFalse(state.jobHealthy.get());
-    assertEquals(ContainerRequestState.ANY_HOST, 
allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost());
-
-    taskManager.onShutdown();
-  }
-
-  @Test
-  public void testAppMasterWithFwk () {
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        new MapConfig(config),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-    taskManager.onInit();
-
-    assertFalse(taskManager.shouldShutdown());
-    ContainerId container2 = 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002");
-    taskManager.onContainerAllocated(TestUtil.getContainer(container2, "", 
12345));
-
-
-    configVals.put(JobConfig.SAMZA_FWK_PATH(), "/export/content/whatever");
-    Config config1 = new MapConfig(configVals);
-
-    SamzaTaskManager taskManager1 = new SamzaTaskManager(
-        new MapConfig(config1),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-
-    taskManager1.onInit();
-    taskManager1.onContainerAllocated(TestUtil.getContainer(container2, "", 
12345));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
deleted file mode 100644
index 3290247..0000000
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn.util;
-
-import java.lang.reflect.Field;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.job.yarn.AbstractContainerAllocator;
-import org.apache.samza.job.yarn.ContainerAllocator;
-import org.apache.samza.job.yarn.ContainerRequestState;
-import org.apache.samza.job.yarn.ContainerUtil;
-
-import java.util.Map;
-
-public class MockContainerAllocator extends ContainerAllocator {
-  public int requestedContainers = 0;
-
-  public MockContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> 
amrmClientAsync,
-                                ContainerUtil containerUtil,
-                                YarnConfig yarnConfig) {
-    super(amrmClientAsync, containerUtil, yarnConfig);
-  }
-
-  @Override
-  public void requestContainers(Map<Integer, String> containerToHostMappings) {
-    requestedContainers += containerToHostMappings.size();
-    super.requestContainers(containerToHostMappings);
-  }
-
-  public ContainerRequestState getContainerRequestState() throws Exception {
-    Field field = 
AbstractContainerAllocator.class.getDeclaredField("containerRequestState");
-    field.setAccessible(true);
-
-    return (ContainerRequestState) field.get(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
deleted file mode 100644
index 7c0b504..0000000
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn.util;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.job.yarn.ContainerRequestState;
-import org.apache.samza.job.yarn.SamzaContainerRequest;
-
-
-public class MockContainerRequestState extends ContainerRequestState {
-  private final List<MockContainerListener> mockContainerListeners = new 
ArrayList<MockContainerListener>();
-  private int numAddedContainers = 0;
-  private int numReleasedContainers = 0;
-  private int numAssignedContainers = 0;
-  public Queue<SamzaContainerRequest> assignedRequests = new LinkedList<>();
-
-  public 
MockContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
-      boolean hostAffinityEnabled) {
-    super(amClient, hostAffinityEnabled);
-  }
-
-  @Override
-  public synchronized void updateStateAfterAssignment(SamzaContainerRequest 
request, String assignedHost, Container container) {
-    super.updateStateAfterAssignment(request, assignedHost, container);
-
-    numAssignedContainers++;
-    assignedRequests.add(request);
-
-    for (MockContainerListener listener : mockContainerListeners) {
-      listener.postUpdateRequestStateAfterAssignment(numAssignedContainers);
-    }
-  }
-
-  @Override
-  public synchronized void addContainer(Container container) {
-    super.addContainer(container);
-
-    numAddedContainers++;
-    for (MockContainerListener listener : mockContainerListeners) {
-      listener.postAddContainer(numAddedContainers);
-    }
-  }
-
-  @Override
-  public synchronized int releaseExtraContainers() {
-    numReleasedContainers += super.releaseExtraContainers();
-
-    for (MockContainerListener listener : mockContainerListeners) {
-      listener.postReleaseContainers(numReleasedContainers);
-    }
-
-    return numAddedContainers;
-  }
-
-  @Override
-  public void releaseUnstartableContainer(Container container) {
-    super.releaseUnstartableContainer(container);
-
-    numReleasedContainers += 1;
-    for (MockContainerListener listener : mockContainerListeners) {
-      listener.postReleaseContainers(numReleasedContainers);
-    }
-  }
-
-  public void registerContainerListener(MockContainerListener listener) {
-    mockContainerListeners.add(listener);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
deleted file mode 100644
index cf3e143..0000000
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.samza.config.Config;
-import org.apache.samza.job.yarn.ContainerUtil;
-import org.apache.samza.job.yarn.SamzaAppState;
-import org.apache.samza.job.yarn.SamzaContainerLaunchException;
-
-
-public class MockContainerUtil extends ContainerUtil {
-  private final List<MockContainerListener> mockContainerListeners = new 
ArrayList<MockContainerListener>();
-  public final Map<String, List<Container>> runningContainerList = new 
HashMap<>();
-  public Exception containerStartException = null;
-
-  public MockContainerUtil(Config config, SamzaAppState state, 
YarnConfiguration conf, NMClient nmClient) {
-    super(config, state, conf);
-    this.setNmClient(nmClient);
-  }
-
-  @Override
-  public void runContainer(int samzaContainerId, Container container) throws 
SamzaContainerLaunchException {
-    String hostname = container.getNodeHttpAddress().split(":")[0];
-    List<Container> list = runningContainerList.get(hostname);
-    if (list == null) {
-      list = new ArrayList<Container>();
-      list.add(container);
-      runningContainerList.put(hostname, list);
-    } else {
-      list.add(container);
-      runningContainerList.put(hostname, list);
-    }
-    super.runContainer(samzaContainerId, container);
-
-    for (MockContainerListener listener : mockContainerListeners) {
-      listener.postRunContainer(runningContainerList.size());
-    }
-  }
-
-  @Override
-  public void startContainer(Path packagePath, Container container, 
Map<String, String> env, String cmd) throws
-                                                                               
                          SamzaContainerLaunchException {
-    if (containerStartException != null) {
-      throw new SamzaContainerLaunchException(containerStartException);
-    }
-  }
-
-  public void registerContainerListener(MockContainerListener listener) {
-    mockContainerListeners.add(listener);
-  }
-
-  public void clearContainerListeners() {
-    mockContainerListeners.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java
deleted file mode 100644
index d4c9c96..0000000
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn.util;
-
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
-
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
-import org.apache.samza.config.Config;
-import org.apache.samza.job.yarn.ContainerUtil;
-import org.apache.samza.job.yarn.SamzaAppMaster$;
-import org.apache.samza.job.yarn.SamzaAppState;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class TestUtil {
-
-  public static AMRMClientAsyncImpl<ContainerRequest> getAMClient(final 
TestAMRMClientImpl amClient) {
-    return new AMRMClientAsyncImpl<ContainerRequest>(amClient, 1, 
SamzaAppMaster$.MODULE$) {
-          public TestAMRMClientImpl getClient() {
-            return amClient;
-          }
-    };
-  }
-
-  public static AllocateResponse getAppMasterResponse(final boolean reboot,
-                                               final List<Container> 
containers,
-                                               final List<ContainerStatus> 
completed) {
-    return new AllocateResponse() {
-      @Override
-      public AMCommand getAMCommand() {
-      // Not sure how to throw exception without changing method signature!
-        if (reboot) {
-          try {
-            throw new ApplicationAttemptNotFoundException("Test - out of 
sync");
-          } catch (ApplicationAttemptNotFoundException e) {
-            return AMCommand.AM_RESYNC;
-          }
-        } else {
-          return null;
-        }
-      }
-
-      @Override
-      public void setAMCommand(AMCommand command) {}
-
-      @Override
-      public int getResponseId() {
-        return 0;
-      }
-
-      @Override
-      public void setResponseId(int responseId) {}
-
-      @Override
-      public List<Container> getAllocatedContainers() {
-        return containers;
-      }
-
-      @Override
-      public void setAllocatedContainers(List<Container> containers) {}
-
-      @Override
-      public Resource getAvailableResources() {
-        return null;
-      }
-
-      @Override
-      public void setAvailableResources(Resource limit) {}
-
-      @Override
-      public List<ContainerStatus> getCompletedContainersStatuses() {
-        return completed;
-      }
-
-      @Override
-      public void setCompletedContainersStatuses(List<ContainerStatus> 
containers) {}
-
-      @Override
-      public List<NodeReport> getUpdatedNodes() {
-        return new ArrayList<NodeReport>();
-      }
-
-      @Override
-      public void setUpdatedNodes(List<NodeReport> updatedNodes) {}
-
-      @Override
-      public int getNumClusterNodes() {
-        return 1;
-      }
-
-      @Override
-      public void setNumClusterNodes(int numNodes) {
-
-      }
-
-      @Override
-      public PreemptionMessage getPreemptionMessage() {
-        return null;
-      }
-
-      @Override
-      public void setPreemptionMessage(PreemptionMessage request) {}
-
-      @Override
-      public List<NMToken> getNMTokens() {
-        return new ArrayList<NMToken>();
-      }
-
-      @Override
-      public void setNMTokens(List<NMToken> nmTokens) {}
-
-      @Override
-      public List<ContainerResourceIncrease> getIncreasedContainers() {
-        return Collections.<ContainerResourceIncrease>emptyList();
-      }
-
-      @Override
-      public void setIncreasedContainers(List<ContainerResourceIncrease> 
increasedContainers) {}
-
-      @Override
-      public List<ContainerResourceDecrease> getDecreasedContainers() {
-        return Collections.<ContainerResourceDecrease>emptyList();
-      }
-
-      @Override
-      public void setDecreasedContainers(List<ContainerResourceDecrease> 
decreasedContainers) {
-
-      }
-
-      @Override
-      public Token getAMRMToken() {
-        return null;
-      }
-
-      @Override
-      public void setAMRMToken(Token amRMToken) {}
-    };
-  }
-
-  public static Container getContainer(final ContainerId containerId, final 
String host, final int port) {
-    return new Container() {
-      @Override
-      public ContainerId getId() {
-        return containerId;
-      }
-
-      @Override
-      public void setId(ContainerId id) { }
-
-      @Override
-      public NodeId getNodeId() {
-        return NodeId.newInstance(host, port);
-      }
-
-      @Override
-      public void setNodeId(NodeId nodeId) {  }
-
-      @Override
-      public String getNodeHttpAddress() {
-        return host + ":" + port;
-      }
-
-      @Override
-      public void setNodeHttpAddress(String nodeHttpAddress) {  }
-
-      @Override
-      public Resource getResource() {
-        return null;
-      }
-
-      @Override
-      public void setResource(Resource resource) {  }
-
-      @Override
-      public Priority getPriority() {
-        return null;
-      }
-
-      @Override
-      public void setPriority(Priority priority) {  }
-
-      @Override
-      public Token getContainerToken() {
-        return null;
-      }
-
-      @Override
-      public void setContainerToken(Token containerToken) { }
-
-      @Override
-      public int compareTo(Container o) {
-        return containerId.compareTo(o.getId());
-      }
-    };
-  }
-
-  /**
-   * Returns MockContainerUtil instance with a Mock NMClient
-   * */
-  public static ContainerUtil getContainerUtil(Config config, SamzaAppState 
state) {
-    return new MockContainerUtil(config, state, new YarnConfiguration(), new 
MockNMClient("Mock NMClient"));
-  }
-
-  public static ContainerStatus getContainerStatus(final ContainerId 
containerId,
-                                                   final int exitCode,
-                                                   final String diagnostic) {
-    return new ContainerStatus() {
-      @Override
-      public ContainerId getContainerId() {
-        return containerId;
-      }
-
-      @Override
-      public void setContainerId(ContainerId containerId) { }
-
-      @Override
-      public ContainerState getState() {
-        return null;
-      }
-
-      @Override
-      public void setState(ContainerState state) {  }
-
-      @Override
-      public int getExitStatus() {
-        return exitCode;
-      }
-
-      @Override
-      public void setExitStatus(int exitStatus) { }
-
-      @Override
-      public String getDiagnostics() {
-        return diagnostic;
-      }
-
-      @Override
-      public void setDiagnostics(String diagnostics) {  }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
new file mode 100644
index 0000000..d3d34f2
--- /dev/null
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import org.apache.samza.Partition
+import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, 
SystemAdmin, SystemFactory}
+import scala.collection.JavaConversions._
+
+/**
+ * A mock implementation class that returns metadata for each stream that 
contains numTasks partitions in it.
+ */
+class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
+  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = 
null
+  def getSystemStreamMetadata(streamNames: java.util.Set[String]) = {
+    streamNames.map(streamName => {
+      var partitionMetadata = (0 until numTasks).map(partitionId => {
+        new Partition(partitionId) -> new SystemStreamPartitionMetadata(null, 
null, null)
+      }).toMap
+      streamName -> new SystemStreamMetadata(streamName, partitionMetadata)
+    }).toMap[String, SystemStreamMetadata]
+  }
+
+  override def createChangelogStream(topicName: String, 
numOfChangeLogPartitions: Int) {
+    new UnsupportedOperationException("Method not implemented.")
+  }
+
+  override def validateChangelogStream(topicName: String, 
numOfChangeLogPartitions: Int) {
+    new UnsupportedOperationException("Method not implemented.")
+  }
+
+  override def createCoordinatorStream(streamName: String) {
+    new UnsupportedOperationException("Method not implemented.")
+  }
+
+  override def offsetComparator(offset1: String, offset2: String) = null
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemFactory.scala 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemFactory.scala
new file mode 100644
index 0000000..458400e
--- /dev/null
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemFactory.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import org.apache.samza.config.{JobConfig, Config}
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.SystemFactory
+
+/**
+  * A {@link org.apache.samza.system.SystemFactory} implementation that 
returns a {@link org.apache.samza.job.yarn.MockSystemAdmin}.
+  */
+
+class MockSystemFactory extends SystemFactory {
+  def getConsumer(systemName: String, config: Config, registry: 
MetricsRegistry) = {
+    throw new RuntimeException("Hmm. Not implemented.")
+  }
+
+  def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry) = {
+    throw new RuntimeException("Hmm. Not implemented.")
+  }
+
+  def getAdmin(systemName: String, config: Config) = {
+    val jobConfig = new JobConfig(config)
+    new MockSystemAdmin(jobConfig.getContainerCount)
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
deleted file mode 100644
index 3f056c4..0000000
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
-import org.apache.samza.job.yarn.util.{TestUtil, TestAMRMClientImpl}
-import org.junit.Test
-import org.junit.Assert._
-
-
-class TestSamzaAppMaster {
-  @Test
-  def testAppMasterShouldShutdown {
-    val amClient = TestUtil.getAMClient(
-      new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-          false,
-          new java.util.ArrayList[Container](),
-          new java.util.ArrayList[ContainerStatus]())
-      ))
-    val listener = new YarnAppMasterListener {
-      var init = 0
-      var shutdown = 0
-      var allocated = 0
-      var complete = 0
-      override def shouldShutdown = true
-      override def onInit() {
-        init += 1
-      }
-      override def onShutdown() {
-        shutdown += 1
-      }
-      override def onContainerAllocated(container: Container) {
-        allocated += 1
-      }
-      override def onContainerCompleted(containerStatus: ContainerStatus) {
-        complete += 1
-      }
-    }
-    SamzaAppMaster.listeners = List(listener)
-    SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new 
YarnConfiguration, 1)
-    assertEquals(1, listener.init)
-    assertEquals(1, listener.shutdown)
-  }
-
-  @Test
-  def testAppMasterShouldShutdownWithFailingListener {
-    val amClient = TestUtil.getAMClient(
-      new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-          false,
-          new java.util.ArrayList[Container](),
-          new java.util.ArrayList[ContainerStatus]())))
-    val listener1 = new YarnAppMasterListener {
-      var shutdown = 0
-      override def shouldShutdown = true
-      override def onShutdown() {
-        shutdown += 1
-        throw new RuntimeException("Some weird failure")
-      }
-    }
-    val listener2 = new YarnAppMasterListener {
-      var shutdown = 0
-      override def shouldShutdown = true
-      override def onShutdown() {
-        shutdown += 1
-      }
-    }
-    // listener1 will throw an exception in shutdown, and listener2 should 
still get called
-    SamzaAppMaster.listeners = List(listener1, listener2)
-    SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new 
YarnConfiguration, 1)
-    assertEquals(1, listener1.shutdown)
-    assertEquals(1, listener2.shutdown)
-  }
-
-  @Test
-  def testAppMasterShouldShutdownWithInterrupt {
-    val amClient = TestUtil.getAMClient(
-      new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-          false,
-          new java.util.ArrayList[Container](),
-          new java.util.ArrayList[ContainerStatus]())
-      )
-    )
-    val listener = new YarnAppMasterListener {
-      var init = 0
-      var shutdown = 0
-      override def shouldShutdown = false
-      override def onInit() {
-        init += 1
-      }
-      override def onShutdown() {
-        shutdown += 1
-      }
-    }
-    val thread = new Thread {
-      override def run {
-        SamzaAppMaster.listeners = List(listener)
-        SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new 
YarnConfiguration, 1)
-      }
-    }
-    thread.start
-    thread.interrupt
-    thread.join
-    assertEquals(1, listener.init)
-    assertEquals(1, listener.shutdown)
-  }
-
-  @Test
-  def testAppMasterShouldForwardAllocatedAndCompleteContainers {
-    val amClient = TestUtil.getAMClient(
-      new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-          false,
-          new java.util.ArrayList[Container]{ add(TestUtil.getContainer(null, 
"", 12345));  },
-          new java.util.ArrayList[ContainerStatus]{ 
add(TestUtil.getContainerStatus(null, 1, null));  })
-      )
-    )
-    val listener = new YarnAppMasterListener {
-      var allocated = 0
-      var complete = 0
-      override def onInit(): Unit = amClient.registerApplicationMaster("", -1, 
"")
-      override def shouldShutdown = (allocated >= 1 && complete >= 1)
-      override def onContainerAllocated(container: Container) {
-        allocated += 1
-      }
-      override def onContainerCompleted(containerStatus: ContainerStatus) {
-        complete += 1
-      }
-    }
-    SamzaAppMaster.listeners = List(listener)
-    SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new 
YarnConfiguration, 1)
-    // heartbeat may be triggered for more than once
-    assertTrue(listener.allocated >= 1)
-    assertTrue(listener.complete >= 1)
-  }
-
-  @Test
-  def testAppMasterShouldReboot {
-    val response: AllocateResponse = getAppMasterResponse(
-      true,
-      new java.util.ArrayList[Container](),
-      new java.util.ArrayList[ContainerStatus]())
-
-    val amClient = TestUtil.getAMClient(
-      new TestAMRMClientImpl(response))
-
-    val listener = new YarnAppMasterListener {
-      var reboot = 0
-      override def onInit(): Unit = amClient.registerApplicationMaster("", -1, 
"")
-      override def shouldShutdown = reboot >= 1
-      override def onReboot() {
-        reboot += 1
-      }
-    }
-    SamzaAppMaster.listeners = List(listener)
-    SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new 
YarnConfiguration, 1)
-    // heartbeat may be triggered for more than once
-    assertTrue(listener.reboot >= 1)
-  }
-
-  /**
-   * This method is necessary because in Yarn 2.6, an RM reboot results in the 
allocate() method throwing an exception,
-   * rather than invoking AM_RESYNC command. However, we cannot mock out the 
AllocateResponse class in java because it
-   * will require the getAMCommand() signature to change and allow throwing an 
exception. This is however allowed in Scala.
-   * Since this is beyond our scope and we don't have a better way to mock the 
scenario for an RM reboot in our unit
-   * tests, we are keeping the following scala method for now.
-   */
-  def getAppMasterResponse(reboot: Boolean, containers: 
java.util.List[Container], completed: java.util.List[ContainerStatus]) =
-    new AllocateResponse {
-      override def getResponseId() = 0
-      override def setResponseId(responseId: Int) {}
-      override def getAllocatedContainers() = containers
-      override def setAllocatedContainers(containers: 
java.util.List[Container]) {}
-      override def getAvailableResources(): Resource = null
-      override def setAvailableResources(limit: Resource) {}
-      override def getCompletedContainersStatuses() = completed
-      override def setCompletedContainersStatuses(containers: 
java.util.List[ContainerStatus]) {}
-      override def setUpdatedNodes(nodes: java.util.List[NodeReport]) {}
-      override def getUpdatedNodes = new java.util.ArrayList[NodeReport]()
-      override def getNumClusterNodes = 1
-      override def setNumClusterNodes(num: Int) {}
-      override def getNMTokens = new java.util.ArrayList[NMToken]()
-      override def setNMTokens(nmTokens: java.util.List[NMToken]) {}
-      override def setAMCommand(command: AMCommand) {}
-      override def getPreemptionMessage = null
-      override def setPreemptionMessage(request: PreemptionMessage) {}
-      override def getDecreasedContainers(): 
java.util.List[ContainerResourceDecrease] = 
java.util.Collections.emptyList[ContainerResourceDecrease]
-      override def getIncreasedContainers(): 
java.util.List[ContainerResourceIncrease] = 
java.util.Collections.emptyList[ContainerResourceIncrease]
-      override def setDecreasedContainers(decrease: 
java.util.List[ContainerResourceDecrease]): Unit = Unit
-      override def setIncreasedContainers(increase: 
java.util.List[ContainerResourceIncrease]): Unit = Unit
-
-      override def getAMCommand = if (reboot) {
-        throw new ApplicationAttemptNotFoundException("Test - out of sync")
-      } else {
-        null
-      }
-      override def getAMRMToken: Token = null
-      override def setAMRMToken(amRMToken: Token): Unit = {}
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
deleted file mode 100644
index 750f467..0000000
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import java.nio.ByteBuffer
-
-import org.apache.hadoop.conf.Configuration
-import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.samza.SamzaException
-import org.junit.Assert._
-import org.junit.Test
-import org.mockito.Mockito
-import java.net.URL
-import org.apache.samza.coordinator.JobModelManager
-
-class TestSamzaAppMasterLifecycle {
-  val coordinator = new JobModelManager(null, null)
-  val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, 
Mockito.mock(classOf[CallbackHandler])) {
-    var host = ""
-    var port = 0
-    var status: FinalApplicationStatus = null
-    override def registerApplicationMaster(appHostName: String, appHostPort: 
Int, appTrackingUrl: String): RegisterApplicationMasterResponse = {
-      this.host = appHostName
-      this.port = appHostPort
-      new RegisterApplicationMasterResponse {
-        override def setApplicationACLs(map: 
java.util.Map[ApplicationAccessType, String]): Unit = ()
-        override def getApplicationACLs = null
-        override def setMaximumResourceCapability(r: Resource): Unit = ()
-        override def getMaximumResourceCapability = new Resource {
-          def getMemory = 512
-          def getVirtualCores = 2
-          def setMemory(memory: Int) {}
-          def setVirtualCores(vCores: Int) {}
-          def compareTo(o: Resource) = 0
-        }
-        override def getClientToAMTokenMasterKey = null
-        override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {}
-        override def getContainersFromPreviousAttempts(): 
java.util.List[Container] = java.util.Collections.emptyList[Container]
-        override def getNMTokensFromPreviousAttempts(): 
java.util.List[NMToken] = java.util.Collections.emptyList[NMToken]
-        override def getQueue(): String = null
-        override def setContainersFromPreviousAttempts(containers: 
java.util.List[Container]): Unit = Unit
-        override def setNMTokensFromPreviousAttempts(nmTokens: 
java.util.List[NMToken]): Unit = Unit
-        override def setQueue(queue: String): Unit = Unit
-
-        override def setSchedulerResourceTypes(types: 
java.util.EnumSet[SchedulerResourceTypes]): Unit = {}
-        override def getSchedulerResourceTypes: 
java.util.EnumSet[SchedulerResourceTypes] = null
-      }
-    }
-    override def unregisterApplicationMaster(appStatus: FinalApplicationStatus,
-      appMessage: String,
-      appTrackingUrl: String) {
-      this.status = appStatus
-    }
-    override def releaseAssignedContainer(containerId: ContainerId) {}
-    override def getClusterNodeCount() = 1
-
-    override def serviceInit(config: Configuration) {}
-    override def serviceStart() {}
-    override def serviceStop() {}
-  }
-
-  @Test
-  def testLifecycleShouldRegisterOnInit {
-    val state = new SamzaAppState(coordinator, -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 
1, 2)
-    state.rpcUrl = new URL("http://localhost:1";)
-    state.trackingUrl = new URL("http://localhost:2";)
-    val saml = new SamzaAppMasterLifecycle(512, 2, state, amClient)
-    saml.onInit
-    assertEquals("test", amClient.host)
-    assertEquals(1, amClient.port)
-    assertFalse(saml.shouldShutdown)
-  }
-
-  @Test
-  def testLifecycleShouldUnregisterOnShutdown {
-    val state = new SamzaAppState(coordinator, -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 
2)
-    state.status = FinalApplicationStatus.SUCCEEDED
-    new SamzaAppMasterLifecycle(128, 1, state, amClient).onShutdown
-    assertEquals(FinalApplicationStatus.SUCCEEDED, amClient.status)
-  }
-
-  @Test
-  def testLifecycleShouldThrowAnExceptionOnReboot {
-    var gotException = false
-    try {
-      new SamzaAppMasterLifecycle(368, 1, null, amClient).onReboot
-    } catch {
-      // expected
-      case e: SamzaException => gotException = true
-    }
-    assertTrue(gotException)
-  }
-
-  @Test
-  def testLifecycleShouldShutdownOnInvalidContainerSettings {
-    val state = new SamzaAppState(coordinator, -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 
1, 2)
-    state.rpcUrl = new URL("http://localhost:1";)
-    state.trackingUrl = new URL("http://localhost:2";)
-    List(new SamzaAppMasterLifecycle(768, 1, state, amClient),
-      new SamzaAppMasterLifecycle(368, 3, state, amClient)).map(saml => {
-        saml.onInit
-        assertTrue(saml.shouldShutdown)
-      })
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
deleted file mode 100644
index 3de5614..0000000
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import java.io.BufferedReader
-import java.net.URL
-import java.io.InputStreamReader
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.samza.Partition
-import org.apache.samza.config.MapConfig
-import org.apache.samza.metrics.MetricsRegistry
-import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, 
SystemAdmin, SystemFactory}
-import org.junit.Assert._
-import org.junit.Test
-import scala.collection.JavaConversions._
-import org.apache.samza.config.Config
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.container.TaskName
-import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
-
-class TestSamzaAppMasterService {
-
-  @Test
-  def testAppMasterDashboardShouldStart {
-    val config = getDummyConfig
-    val state = new SamzaAppState(JobModelManager(config), -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 
2)
-    val service = new SamzaAppMasterService(config, state, null, null, null)
-    val taskName = new TaskName("test")
-
-    // start the dashboard
-    service.onInit
-    assertTrue(state.rpcUrl.getPort > 0)
-    assertTrue(state.trackingUrl.getPort > 0)
-    assertTrue(state.coordinatorUrl.getPort > 0)
-
-    // check to see if it's running
-    val url = new URL(state.rpcUrl.toString + "am")
-    val is = url.openConnection().getInputStream();
-    val reader = new BufferedReader(new InputStreamReader(is));
-    var line: String = null;
-
-    do {
-      line = reader.readLine()
-    } while (line != null)
-
-    reader.close();
-  }
-
-  /**
-   * This tests the rendering of the index.scaml file containing some Scala 
code. The objective
-   * is to ensure that the rendered scala code builds correctly
-   */
-  @Test
-  def testAppMasterDashboardWebServiceShouldStart {
-    // Create some dummy config
-    val config = getDummyConfig
-    val state = new SamzaAppState(JobModelManager(config), -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 
2)
-    val service = new SamzaAppMasterService(config, state, null, null, null)
-
-    // start the dashboard
-    service.onInit
-    assertTrue(state.rpcUrl.getPort > 0)
-    assertTrue(state.trackingUrl.getPort > 0)
-
-    // Do a GET Request on the tracking port: This in turn will render 
index.scaml
-    val url = state.trackingUrl
-    val is = url.openConnection().getInputStream()
-    val reader = new BufferedReader(new InputStreamReader(is))
-    var line: String = null
-
-    do {
-      line = reader.readLine()
-    } while (line != null)
-
-    reader.close
-  }
-
-  private def getDummyConfig: Config = new MapConfig(Map[String, String](
-    "job.name" -> "test",
-    "yarn.container.count" -> "1",
-    "systems.test-system.samza.factory" -> 
"org.apache.samza.job.yarn.MockSystemFactory",
-    "yarn.container.memory.mb" -> "512",
-    "yarn.package.path" -> "/foo",
-    "task.inputs" -> "test-system.test-stream",
-    "systems.test-system.samza.key.serde" -> 
"org.apache.samza.serializers.JsonSerde",
-    "systems.test-system.samza.msg.serde" -> 
"org.apache.samza.serializers.JsonSerde",
-    "yarn.container.retry.count" -> "1",
-    "yarn.container.retry.window.ms" -> "1999999999",
-    "job.coordinator.system" -> "coordinator",
-    "systems.coordinator.samza.factory" -> 
classOf[MockCoordinatorStreamSystemFactory].getCanonicalName))
-}
-
-class MockSystemFactory extends SystemFactory {
-  def getConsumer(systemName: String, config: Config, registry: 
MetricsRegistry) = {
-    throw new RuntimeException("Hmm. Not implemented.")
-  }
-
-  def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry) = {
-    throw new RuntimeException("Hmm. Not implemented.")
-  }
-
-  def getAdmin(systemName: String, config: Config) = {
-    new MockSystemAdmin(config.getContainerCount)
-  }
-}
-
-/**
- * Helper class that returns metadata for each stream that contains numTasks 
partitions in it.
- */
-class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
-  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = 
null
-  def getSystemStreamMetadata(streamNames: java.util.Set[String]) = {
-    streamNames.map(streamName => {
-      var partitionMetadata = (0 until numTasks).map(partitionId => {
-        new Partition(partitionId) -> new SystemStreamPartitionMetadata(null, 
null, null)
-      }).toMap
-      streamName -> new SystemStreamMetadata(streamName, partitionMetadata)
-    }).toMap[String, SystemStreamMetadata]
-  }
-
-  override def createChangelogStream(topicName: String, 
numOfChangeLogPartitions: Int) {
-    new UnsupportedOperationException("Method not implemented.")
-  }
-
-  override def validateChangelogStream(topicName: String, 
numOfChangeLogPartitions: Int) {
-    new UnsupportedOperationException("Method not implemented.")
-  }
-
-  override def createCoordinatorStream(streamName: String) {
-    new UnsupportedOperationException("Method not implemented.")
-  }
-
-  override def offsetComparator(offset1: String, offset2: String) = null
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala
new file mode 100644
index 0000000..2664e41
--- /dev/null
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import java.net.URL
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.samza.SamzaException
+import org.apache.samza.clustermanager.SamzaApplicationState
+import org.apache.samza.clustermanager.SamzaApplicationState.SamzaAppStatus
+import org.apache.samza.coordinator.JobModelManager
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito
+
+class TestSamzaYarnAppMasterLifecycle {
+  val coordinator = new JobModelManager(null, null)
+  val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, 
Mockito.mock(classOf[CallbackHandler])) {
+    var host = ""
+    var port = 0
+    var status: FinalApplicationStatus = null
+    override def registerApplicationMaster(appHostName: String, appHostPort: 
Int, appTrackingUrl: String): RegisterApplicationMasterResponse = {
+      this.host = appHostName
+      this.port = appHostPort
+      new RegisterApplicationMasterResponse {
+        override def setApplicationACLs(map: 
java.util.Map[ApplicationAccessType, String]): Unit = ()
+        override def getApplicationACLs = null
+        override def setMaximumResourceCapability(r: Resource): Unit = ()
+        override def getMaximumResourceCapability = new Resource {
+          def getMemory = 512
+          def getVirtualCores = 2
+          def setMemory(memory: Int) {}
+          def setVirtualCores(vCores: Int) {}
+          def compareTo(o: Resource) = 0
+        }
+        override def getClientToAMTokenMasterKey = null
+        override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {}
+        override def getContainersFromPreviousAttempts(): 
java.util.List[Container] = java.util.Collections.emptyList[Container]
+        override def getNMTokensFromPreviousAttempts(): 
java.util.List[NMToken] = java.util.Collections.emptyList[NMToken]
+        override def getQueue(): String = null
+        override def setContainersFromPreviousAttempts(containers: 
java.util.List[Container]): Unit = Unit
+        override def setNMTokensFromPreviousAttempts(nmTokens: 
java.util.List[NMToken]): Unit = Unit
+        override def setQueue(queue: String): Unit = Unit
+
+        override def setSchedulerResourceTypes(types: 
java.util.EnumSet[SchedulerResourceTypes]): Unit = {}
+        override def getSchedulerResourceTypes: 
java.util.EnumSet[SchedulerResourceTypes] = null
+      }
+    }
+    override def unregisterApplicationMaster(appStatus: FinalApplicationStatus,
+      appMessage: String,
+      appTrackingUrl: String) {
+      this.status = appStatus
+    }
+    override def releaseAssignedContainer(containerId: ContainerId) {}
+    override def getClusterNodeCount() = 1
+
+    override def serviceInit(config: Configuration) {}
+    override def serviceStart() {}
+    override def serviceStop() {}
+  }
+
+  @Test
+  def testLifecycleShouldRegisterOnInit {
+    val state = new SamzaApplicationState(coordinator)
+
+    val yarnState = new YarnAppState(1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), 
"testHost", 1, 2);
+    yarnState.rpcUrl = new URL("http://localhost:1";)
+    yarnState.trackingUrl = new URL("http://localhost:2";)
+
+    val saml = new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, 
amClient)
+    saml.onInit
+    assertEquals("testHost", amClient.host)
+    assertEquals(1, amClient.port)
+    assertFalse(saml.shouldShutdown)
+  }
+
+  @Test
+  def testLifecycleShouldUnregisterOnShutdown {
+    val state = new SamzaApplicationState(coordinator)
+
+    val yarnState =  new YarnAppState(1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), 
"testHost", 1, 2);
+    new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, 
amClient).onShutdown (SamzaAppStatus.SUCCEEDED)
+    assertEquals(FinalApplicationStatus.SUCCEEDED, amClient.status)
+  }
+
+  @Test
+  def testLifecycleShouldThrowAnExceptionOnReboot {
+    var gotException = false
+    try {
+      val state = new SamzaApplicationState(coordinator)
+
+      val yarnState =  new YarnAppState(1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), 
"testHost", 1, 2);
+      new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, 
amClient).onReboot()
+    } catch {
+      // expected
+      case e: SamzaException => gotException = true
+    }
+    assertTrue(gotException)
+  }
+
+  @Test
+  def testLifecycleShouldShutdownOnInvalidContainerSettings {
+    val state = new SamzaApplicationState(coordinator)
+
+    val yarnState =  new YarnAppState(1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), 
"testHost", 1, 2);
+    yarnState.rpcUrl = new URL("http://localhost:1";)
+    yarnState.trackingUrl = new URL("http://localhost:2";)
+
+    //Request a higher amount of memory from yarn.
+    List(new SamzaYarnAppMasterLifecycle(768, 1, state, yarnState, amClient),
+    //Request a higher number of cores from yarn.
+      new SamzaYarnAppMasterLifecycle(368, 3, state, yarnState, 
amClient)).map(saml => {
+        saml.onInit
+        assertTrue(saml.shouldShutdown)
+      })
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
new file mode 100644
index 0000000..1dd0c18
--- /dev/null
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import java.io.BufferedReader
+import java.net.URL
+import java.io.InputStreamReader
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.samza.Partition
+import org.apache.samza.clustermanager.SamzaApplicationState
+import org.apache.samza.config.MapConfig
+import org.apache.samza.metrics.MetricsRegistry
+import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, 
SystemAdmin, SystemFactory}
+import org.junit.Assert._
+import org.junit.Test
+import scala.collection.JavaConversions._
+import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.container.TaskName
+import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+
+class TestSamzaYarnAppMasterService {
+
+  @Test
+  def testAppMasterDashboardShouldStart {
+    val config = getDummyConfig
+    val jobModelManager = JobModelManager(config)
+    val samzaState = new SamzaApplicationState(jobModelManager)
+
+    val state = new YarnAppState(-1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), 
"testHost", 1, 1);
+    val service = new SamzaYarnAppMasterService(config, samzaState, state, 
null, null)
+    val taskName = new TaskName("test")
+
+    // start the dashboard
+    service.onInit
+    assertTrue(state.rpcUrl.getPort > 0)
+    assertTrue(state.trackingUrl.getPort > 0)
+    assertTrue(state.coordinatorUrl.getPort > 0)
+
+    // check to see if it's running
+    val url = new URL(state.rpcUrl.toString + "am")
+    val is = url.openConnection().getInputStream();
+    val reader = new BufferedReader(new InputStreamReader(is));
+    var line: String = null;
+
+    do {
+      line = reader.readLine()
+    } while (line != null)
+
+    reader.close();
+  }
+
+  /**
+   * This tests the rendering of the index.scaml file containing some Scala 
code. The objective
+   * is to ensure that the rendered scala code builds correctly
+   */
+  @Test
+  def testAppMasterDashboardWebServiceShouldStart {
+    // Create some dummy config
+    val config = getDummyConfig
+    val jobModelManager = JobModelManager(config)
+    val samzaState = new SamzaApplicationState(jobModelManager)
+    val state = new YarnAppState(-1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), 
"testHost", 1, 1);
+
+    val service = new SamzaYarnAppMasterService(config, samzaState, state, 
null, null)
+
+    // start the dashboard
+    service.onInit
+    assertTrue(state.rpcUrl.getPort > 0)
+    assertTrue(state.trackingUrl.getPort > 0)
+
+    // Do a GET Request on the tracking port: This in turn will render 
index.scaml
+    val url = state.trackingUrl
+    val is = url.openConnection().getInputStream()
+    val reader = new BufferedReader(new InputStreamReader(is))
+    var line: String = null
+
+    do {
+      line = reader.readLine()
+    } while (line != null)
+
+    reader.close
+  }
+
+  private def getDummyConfig: Config = new MapConfig(Map[String, String](
+    "job.name" -> "test",
+    "yarn.container.count" -> "1",
+    "systems.test-system.samza.factory" -> 
"org.apache.samza.job.yarn.MockSystemFactory",
+    "yarn.container.memory.mb" -> "512",
+    "yarn.package.path" -> "/foo",
+    "task.inputs" -> "test-system.test-stream",
+    "systems.test-system.samza.key.serde" -> 
"org.apache.samza.serializers.JsonSerde",
+    "systems.test-system.samza.msg.serde" -> 
"org.apache.samza.serializers.JsonSerde",
+    "yarn.container.retry.count" -> "1",
+    "yarn.container.retry.window.ms" -> "1999999999",
+    "job.coordinator.system" -> "coordinator",
+    "systems.coordinator.samza.factory" -> 
classOf[MockCoordinatorStreamSystemFactory].getCanonicalName))
+}
+
+
+
+

Reply via email to