Repository: samza Updated Branches: refs/heads/master 920f803a2 -> 9396ee5cc
http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java deleted file mode 100644 index d747b81..0000000 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java +++ /dev/null @@ -1,502 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn; - -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.config.MapConfig; -import org.apache.samza.config.YarnConfig; -import org.apache.samza.container.LocalityManager; -import org.apache.samza.container.TaskName; -import org.apache.samza.coordinator.JobModelManager; -import org.apache.samza.coordinator.server.HttpServer; -import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.job.model.JobModel; -import org.apache.samza.job.model.TaskModel; -import org.apache.samza.job.yarn.util.MockContainerAllocator; -import org.apache.samza.job.yarn.util.MockHttpServer; -import org.apache.samza.job.yarn.util.TestAMRMClientImpl; -import org.apache.samza.job.yarn.util.TestUtil; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.servlet.ServletHolder; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestSamzaTaskManager { - private AMRMClientAsyncImpl amRmClientAsync; - private TestAMRMClientImpl testAMRMClient; - - private static volatile boolean isRunning = false; - - private Map<String, String> configVals = new HashMap<String, String>() { - { - put("yarn.container.count", "1"); - put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); - put("yarn.container.memory.mb", "512"); - put("yarn.package.path", "/foo"); - put("task.inputs", "test-system.test-stream"); - put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde"); - put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); - put("yarn.container.retry.count", "1"); - put("yarn.container.retry.window.ms", "1999999999"); - put("yarn.allocator.sleep.ms", "1"); - put("yarn.container.request.timeout.ms", "2"); - } - }; - private Config config = new MapConfig(configVals); - - private Config getConfig() { - Map<String, String> map = new HashMap<>(); - map.putAll(config); - return new MapConfig(map); - } - - private Config getConfigWithHostAffinity() { - Map<String, String> map = new HashMap<>(); - map.putAll(config); - map.put("yarn.samza.host-affinity.enabled", "true"); - return new MapConfig(map); - } - - private SamzaAppState state = null; - private HttpServer server = null; - - private JobModelManager getCoordinator(int containerCount) { - Map<Integer, ContainerModel> containers = new java.util.HashMap<>(); - for (int i = 0; i < containerCount; i++) { - ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>()); - containers.put(i, container); - } - Map<Integer, Map<String, String>> localityMap = new HashMap<>(); - localityMap.put(0, new HashMap<String, String>(){ - { - put(SetContainerHostMapping.HOST_KEY, "abc"); - } - }); - LocalityManager mockLocalityManager = mock(LocalityManager.class); - when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap); - - JobModel jobModel = new JobModel(getConfig(), containers, mockLocalityManager); - JobModelManager.jobModelRef().getAndSet(jobModel); - return new JobModelManager(jobModel, server, null); - } - - @Before - public void setup() throws Exception { - // Create AMRMClient - testAMRMClient = new TestAMRMClientImpl( - TestUtil.getAppMasterResponse( - false, - new ArrayList<Container>(), - new ArrayList<ContainerStatus>() - )); - amRmClientAsync = TestUtil.getAMClient(testAMRMClient); - - server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); - - // Initialize coordinator url - state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2); - state.coordinatorUrl = new URL("http://localhost:1234"); - } - - @After - public void teardown() { - server.stop(); - } - - private Field getPrivateFieldFromTaskManager(String fieldName, SamzaTaskManager object) throws Exception { - Field field = object.getClass().getDeclaredField(fieldName); - field.setAccessible(true); - return field; - } - - @Test - public void testSamzaTaskManager() throws Exception { - Map<String, String> conf = new HashMap<>(); - conf.putAll(getConfig()); - conf.put("yarn.container.memory.mb", "500"); - conf.put("yarn.container.cpu.cores", "5"); - - SamzaTaskManager taskManager = new SamzaTaskManager( - new MapConfig(conf), - state, - amRmClientAsync, - new YarnConfiguration() - ); - - AbstractContainerAllocator allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager); - assertEquals(ContainerAllocator.class, allocator.getClass()); - // Asserts that samza exposed container configs is honored by allocator thread - assertEquals(500, allocator.containerMaxMemoryMb); - assertEquals(5, allocator.containerMaxCpuCore); - - conf.clear(); - conf.putAll(getConfigWithHostAffinity()); - conf.put("yarn.container.memory.mb", "500"); - conf.put("yarn.container.cpu.cores", "5"); - - taskManager = new SamzaTaskManager( - new MapConfig(conf), - state, - amRmClientAsync, - new YarnConfiguration() - ); - - allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager); - assertEquals(HostAwareContainerAllocator.class, allocator.getClass()); - // Asserts that samza exposed container configs is honored by allocator thread - assertEquals(500, allocator.containerMaxMemoryMb); - assertEquals(5, allocator.containerMaxCpuCore); - } - - @Test - public void testContainerConfigsAreHonoredInAllocator() { - - } - - @Test - public void testOnInit() throws Exception { - Config conf = getConfig(); - SamzaTaskManager taskManager = new SamzaTaskManager( - conf, - state, - amRmClientAsync, - new YarnConfiguration() - ); - - MockContainerAllocator allocator = new MockContainerAllocator( - amRmClientAsync, - TestUtil.getContainerUtil(getConfig(), state), - new YarnConfig(conf)); - getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); - - getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, new Thread() { - public void run() { - isRunning = true; - } - }); - - taskManager.onInit(); - Thread.sleep(1000); - - // Verify Allocator thread has started running - assertTrue(isRunning); - - // Verify the remaining state - assertEquals(1, state.neededContainers.get()); - assertEquals(1, allocator.requestedContainers); - - taskManager.onShutdown(); - } - - @Test - public void testOnShutdown() throws Exception { - SamzaTaskManager taskManager = new SamzaTaskManager( - getConfig(), - state, - amRmClientAsync, - new YarnConfiguration() - ); - taskManager.onInit(); - - Thread.sleep(100); - - Thread allocatorThread = (Thread) getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager); - assertTrue(allocatorThread.isAlive()); - - taskManager.onShutdown(); - - Thread.sleep(100); - assertFalse(allocatorThread.isAlive()); - - } - - /** - * Test Task Manager should stop when all containers finish - */ - @Test - public void testTaskManagerShouldStopWhenContainersFinish() { - SamzaTaskManager taskManager = new SamzaTaskManager( - getConfig(), - state, - amRmClientAsync, - new YarnConfiguration() - ); - - taskManager.onInit(); - - assertFalse(taskManager.shouldShutdown()); - - taskManager.onContainerCompleted(TestUtil.getContainerStatus(state.amContainerId, ContainerExitStatus.SUCCESS, "")); - - assertTrue(taskManager.shouldShutdown()); - } - - /** - * Test Task Manager should request a new container when a task fails with unknown exit code - * When host-affinity is not enabled, it will always request for ANY_HOST - */ - @Test - public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception { - Config conf = getConfig(); - SamzaTaskManager taskManager = new SamzaTaskManager( - conf, - state, - amRmClientAsync, - new YarnConfiguration() - ); - MockContainerAllocator allocator = new MockContainerAllocator( - amRmClientAsync, - TestUtil.getContainerUtil(getConfig(), state), - new YarnConfig(conf)); - getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); - - Thread thread = new Thread(allocator); - getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread); - - // onInit triggers a request - taskManager.onInit(); - - assertFalse(taskManager.shouldShutdown()); - assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size()); - - Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123); - taskManager.onContainerAllocated(container); - - // Allow container to run and update state - Thread.sleep(300); - - // Create first container failure - taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 1, "Expecting a failure here")); - - // The above failure should trigger a container request - assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size()); - assertEquals(ContainerRequestState.ANY_HOST, allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost()); - assertFalse(taskManager.shouldShutdown()); - assertFalse(state.jobHealthy.get()); - assertEquals(2, testAMRMClient.requests.size()); - assertEquals(0, testAMRMClient.getRelease().size()); - - taskManager.onContainerAllocated(container); - - // Allow container to run and update state - Thread.sleep(300); - - assertTrue(state.jobHealthy.get()); - - // Create a second failure - taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 1, "Expecting a failure here")); - - // The above failure should trigger a job shutdown because our retry count is set to 1 - assertEquals(0, allocator.getContainerRequestState().getRequestsQueue().size()); - assertEquals(2, testAMRMClient.requests.size()); - assertEquals(0, testAMRMClient.getRelease().size()); - assertFalse(state.jobHealthy.get()); - assertTrue(taskManager.shouldShutdown()); - assertEquals(FinalApplicationStatus.FAILED, state.status); - - taskManager.onShutdown(); - } - - /** - * Test Task Manager should request a new container when a task fails with unknown exit code - * When host-affinity is enabled, it will always request for the same host that it was last seen on - */ - @Test - public void testSameContainerRequestedOnFailureWithUnknownCode() throws Exception { - Config conf = getConfigWithHostAffinity(); - SamzaTaskManager taskManager = new SamzaTaskManager( - conf, - state, - amRmClientAsync, - new YarnConfiguration() - ); - MockContainerAllocator allocator = new MockContainerAllocator( - amRmClientAsync, - TestUtil.getContainerUtil(getConfig(), state), - new YarnConfig(conf)); - getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); - - Thread thread = new Thread(allocator); - getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread); - - // onInit triggers a request - taskManager.onInit(); - - assertFalse(taskManager.shouldShutdown()); - assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size()); - - Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123); - taskManager.onContainerAllocated(container); - - // Allow container to run and update state - Thread.sleep(300); - - // Create first container failure - taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 1, "Expecting a failure here")); - - // The above failure should trigger a container request - assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size()); - assertEquals("abc", allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost()); - assertFalse(taskManager.shouldShutdown()); - assertFalse(state.jobHealthy.get()); - assertEquals(2, testAMRMClient.requests.size()); - assertEquals(0, testAMRMClient.getRelease().size()); - - taskManager.onContainerAllocated(container); - - // Allow container to run and update state - Thread.sleep(300); - - assertTrue(state.jobHealthy.get()); - - // Create a second failure - taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 1, "Expecting a failure here")); - - // The above failure should trigger a job shutdown because our retry count is set to 1 - assertEquals(0, allocator.getContainerRequestState().getRequestsQueue().size()); - assertEquals(2, testAMRMClient.requests.size()); - assertEquals(0, testAMRMClient.getRelease().size()); - assertFalse(state.jobHealthy.get()); - assertTrue(taskManager.shouldShutdown()); - assertEquals(FinalApplicationStatus.FAILED, state.status); - - taskManager.onShutdown(); - } - - /** - * Test AM requests a new container when a task fails - * Error codes with same behavior - Disk failure, preemption and aborted - */ - @Test - public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception { - Map<String, String> config = new HashMap<>(); - config.putAll(getConfig()); - config.remove("yarn.container.retry.count"); - - SamzaTaskManager taskManager = new SamzaTaskManager( - new MapConfig(config), - state, - amRmClientAsync, - new YarnConfiguration() - ); - MockContainerAllocator allocator = new MockContainerAllocator( - amRmClientAsync, - TestUtil.getContainerUtil(getConfig(), state), - new YarnConfig(new MapConfig(config))); - getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); - - Thread thread = new Thread(allocator); - getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread); - - // Start the task manager - taskManager.onInit(); - assertFalse(taskManager.shouldShutdown()); - assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size()); - - Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123); - taskManager.onContainerAllocated(container); - - // Allow container to run and update state - Thread.sleep(300); - - // Create container failure - with ContainerExitStatus.DISKS_FAILED - taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), ContainerExitStatus.DISKS_FAILED, "Disk failure")); - - // The above failure should trigger a container request - assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size()); - assertFalse(taskManager.shouldShutdown()); - assertFalse(state.jobHealthy.get()); - assertEquals(2, testAMRMClient.requests.size()); - assertEquals(0, testAMRMClient.getRelease().size()); - assertEquals(ContainerRequestState.ANY_HOST, allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost()); - - // Create container failure - with ContainerExitStatus.PREEMPTED - taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), ContainerExitStatus.PREEMPTED, "Task Preempted by RM")); - - // The above failure should trigger a container request - assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size()); - assertFalse(taskManager.shouldShutdown()); - assertFalse(state.jobHealthy.get()); - assertEquals(ContainerRequestState.ANY_HOST, allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost()); - - // Create container failure - with ContainerExitStatus.ABORTED - taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), ContainerExitStatus.ABORTED, "Task Aborted by the NM")); - - // The above failure should trigger a container request - assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size()); - assertEquals(2, testAMRMClient.requests.size()); - assertEquals(0, testAMRMClient.getRelease().size()); - assertFalse(taskManager.shouldShutdown()); - assertFalse(state.jobHealthy.get()); - assertEquals(ContainerRequestState.ANY_HOST, allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost()); - - taskManager.onShutdown(); - } - - @Test - public void testAppMasterWithFwk () { - SamzaTaskManager taskManager = new SamzaTaskManager( - new MapConfig(config), - state, - amRmClientAsync, - new YarnConfiguration() - ); - taskManager.onInit(); - - assertFalse(taskManager.shouldShutdown()); - ContainerId container2 = ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"); - taskManager.onContainerAllocated(TestUtil.getContainer(container2, "", 12345)); - - - configVals.put(JobConfig.SAMZA_FWK_PATH(), "/export/content/whatever"); - Config config1 = new MapConfig(configVals); - - SamzaTaskManager taskManager1 = new SamzaTaskManager( - new MapConfig(config1), - state, - amRmClientAsync, - new YarnConfiguration() - ); - - taskManager1.onInit(); - taskManager1.onContainerAllocated(TestUtil.getContainer(container2, "", 12345)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java deleted file mode 100644 index 3290247..0000000 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.job.yarn.util; - -import java.lang.reflect.Field; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.samza.config.YarnConfig; -import org.apache.samza.job.yarn.AbstractContainerAllocator; -import org.apache.samza.job.yarn.ContainerAllocator; -import org.apache.samza.job.yarn.ContainerRequestState; -import org.apache.samza.job.yarn.ContainerUtil; - -import java.util.Map; - -public class MockContainerAllocator extends ContainerAllocator { - public int requestedContainers = 0; - - public MockContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync, - ContainerUtil containerUtil, - YarnConfig yarnConfig) { - super(amrmClientAsync, containerUtil, yarnConfig); - } - - @Override - public void requestContainers(Map<Integer, String> containerToHostMappings) { - requestedContainers += containerToHostMappings.size(); - super.requestContainers(containerToHostMappings); - } - - public ContainerRequestState getContainerRequestState() throws Exception { - Field field = AbstractContainerAllocator.class.getDeclaredField("containerRequestState"); - field.setAccessible(true); - - return (ContainerRequestState) field.get(this); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java deleted file mode 100644 index 7c0b504..0000000 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.job.yarn.util; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.samza.job.yarn.ContainerRequestState; -import org.apache.samza.job.yarn.SamzaContainerRequest; - - -public class MockContainerRequestState extends ContainerRequestState { - private final List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>(); - private int numAddedContainers = 0; - private int numReleasedContainers = 0; - private int numAssignedContainers = 0; - public Queue<SamzaContainerRequest> assignedRequests = new LinkedList<>(); - - public MockContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient, - boolean hostAffinityEnabled) { - super(amClient, hostAffinityEnabled); - } - - @Override - public synchronized void updateStateAfterAssignment(SamzaContainerRequest request, String assignedHost, Container container) { - super.updateStateAfterAssignment(request, assignedHost, container); - - numAssignedContainers++; - assignedRequests.add(request); - - for (MockContainerListener listener : mockContainerListeners) { - listener.postUpdateRequestStateAfterAssignment(numAssignedContainers); - } - } - - @Override - public synchronized void addContainer(Container container) { - super.addContainer(container); - - numAddedContainers++; - for (MockContainerListener listener : mockContainerListeners) { - listener.postAddContainer(numAddedContainers); - } - } - - @Override - public synchronized int releaseExtraContainers() { - numReleasedContainers += super.releaseExtraContainers(); - - for (MockContainerListener listener : mockContainerListeners) { - listener.postReleaseContainers(numReleasedContainers); - } - - return numAddedContainers; - } - - @Override - public void releaseUnstartableContainer(Container container) { - super.releaseUnstartableContainer(container); - - numReleasedContainers += 1; - for (MockContainerListener listener : mockContainerListeners) { - listener.postReleaseContainers(numReleasedContainers); - } - } - - public void registerContainerListener(MockContainerListener listener) { - mockContainerListeners.add(listener); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java deleted file mode 100644 index cf3e143..0000000 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.job.yarn.util; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.client.api.NMClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.samza.config.Config; -import org.apache.samza.job.yarn.ContainerUtil; -import org.apache.samza.job.yarn.SamzaAppState; -import org.apache.samza.job.yarn.SamzaContainerLaunchException; - - -public class MockContainerUtil extends ContainerUtil { - private final List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>(); - public final Map<String, List<Container>> runningContainerList = new HashMap<>(); - public Exception containerStartException = null; - - public MockContainerUtil(Config config, SamzaAppState state, YarnConfiguration conf, NMClient nmClient) { - super(config, state, conf); - this.setNmClient(nmClient); - } - - @Override - public void runContainer(int samzaContainerId, Container container) throws SamzaContainerLaunchException { - String hostname = container.getNodeHttpAddress().split(":")[0]; - List<Container> list = runningContainerList.get(hostname); - if (list == null) { - list = new ArrayList<Container>(); - list.add(container); - runningContainerList.put(hostname, list); - } else { - list.add(container); - runningContainerList.put(hostname, list); - } - super.runContainer(samzaContainerId, container); - - for (MockContainerListener listener : mockContainerListeners) { - listener.postRunContainer(runningContainerList.size()); - } - } - - @Override - public void startContainer(Path packagePath, Container container, Map<String, String> env, String cmd) throws - SamzaContainerLaunchException { - if (containerStartException != null) { - throw new SamzaContainerLaunchException(containerStartException); - } - } - - public void registerContainerListener(MockContainerListener listener) { - mockContainerListeners.add(listener); - } - - public void clearContainerListeners() { - mockContainerListeners.clear(); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java deleted file mode 100644 index d4c9c96..0000000 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.job.yarn.util; - -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.*; -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; -import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; - -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; -import org.apache.samza.config.Config; -import org.apache.samza.job.yarn.ContainerUtil; -import org.apache.samza.job.yarn.SamzaAppMaster$; -import org.apache.samza.job.yarn.SamzaAppState; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -public class TestUtil { - - public static AMRMClientAsyncImpl<ContainerRequest> getAMClient(final TestAMRMClientImpl amClient) { - return new AMRMClientAsyncImpl<ContainerRequest>(amClient, 1, SamzaAppMaster$.MODULE$) { - public TestAMRMClientImpl getClient() { - return amClient; - } - }; - } - - public static AllocateResponse getAppMasterResponse(final boolean reboot, - final List<Container> containers, - final List<ContainerStatus> completed) { - return new AllocateResponse() { - @Override - public AMCommand getAMCommand() { - // Not sure how to throw exception without changing method signature! - if (reboot) { - try { - throw new ApplicationAttemptNotFoundException("Test - out of sync"); - } catch (ApplicationAttemptNotFoundException e) { - return AMCommand.AM_RESYNC; - } - } else { - return null; - } - } - - @Override - public void setAMCommand(AMCommand command) {} - - @Override - public int getResponseId() { - return 0; - } - - @Override - public void setResponseId(int responseId) {} - - @Override - public List<Container> getAllocatedContainers() { - return containers; - } - - @Override - public void setAllocatedContainers(List<Container> containers) {} - - @Override - public Resource getAvailableResources() { - return null; - } - - @Override - public void setAvailableResources(Resource limit) {} - - @Override - public List<ContainerStatus> getCompletedContainersStatuses() { - return completed; - } - - @Override - public void setCompletedContainersStatuses(List<ContainerStatus> containers) {} - - @Override - public List<NodeReport> getUpdatedNodes() { - return new ArrayList<NodeReport>(); - } - - @Override - public void setUpdatedNodes(List<NodeReport> updatedNodes) {} - - @Override - public int getNumClusterNodes() { - return 1; - } - - @Override - public void setNumClusterNodes(int numNodes) { - - } - - @Override - public PreemptionMessage getPreemptionMessage() { - return null; - } - - @Override - public void setPreemptionMessage(PreemptionMessage request) {} - - @Override - public List<NMToken> getNMTokens() { - return new ArrayList<NMToken>(); - } - - @Override - public void setNMTokens(List<NMToken> nmTokens) {} - - @Override - public List<ContainerResourceIncrease> getIncreasedContainers() { - return Collections.<ContainerResourceIncrease>emptyList(); - } - - @Override - public void setIncreasedContainers(List<ContainerResourceIncrease> increasedContainers) {} - - @Override - public List<ContainerResourceDecrease> getDecreasedContainers() { - return Collections.<ContainerResourceDecrease>emptyList(); - } - - @Override - public void setDecreasedContainers(List<ContainerResourceDecrease> decreasedContainers) { - - } - - @Override - public Token getAMRMToken() { - return null; - } - - @Override - public void setAMRMToken(Token amRMToken) {} - }; - } - - public static Container getContainer(final ContainerId containerId, final String host, final int port) { - return new Container() { - @Override - public ContainerId getId() { - return containerId; - } - - @Override - public void setId(ContainerId id) { } - - @Override - public NodeId getNodeId() { - return NodeId.newInstance(host, port); - } - - @Override - public void setNodeId(NodeId nodeId) { } - - @Override - public String getNodeHttpAddress() { - return host + ":" + port; - } - - @Override - public void setNodeHttpAddress(String nodeHttpAddress) { } - - @Override - public Resource getResource() { - return null; - } - - @Override - public void setResource(Resource resource) { } - - @Override - public Priority getPriority() { - return null; - } - - @Override - public void setPriority(Priority priority) { } - - @Override - public Token getContainerToken() { - return null; - } - - @Override - public void setContainerToken(Token containerToken) { } - - @Override - public int compareTo(Container o) { - return containerId.compareTo(o.getId()); - } - }; - } - - /** - * Returns MockContainerUtil instance with a Mock NMClient - * */ - public static ContainerUtil getContainerUtil(Config config, SamzaAppState state) { - return new MockContainerUtil(config, state, new YarnConfiguration(), new MockNMClient("Mock NMClient")); - } - - public static ContainerStatus getContainerStatus(final ContainerId containerId, - final int exitCode, - final String diagnostic) { - return new ContainerStatus() { - @Override - public ContainerId getContainerId() { - return containerId; - } - - @Override - public void setContainerId(ContainerId containerId) { } - - @Override - public ContainerState getState() { - return null; - } - - @Override - public void setState(ContainerState state) { } - - @Override - public int getExitStatus() { - return exitCode; - } - - @Override - public void setExitStatus(int exitStatus) { } - - @Override - public String getDiagnostics() { - return diagnostic; - } - - @Override - public void setDiagnostics(String diagnostics) { } - }; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala new file mode 100644 index 0000000..d3d34f2 --- /dev/null +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn + +import org.apache.samza.Partition +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, SystemAdmin, SystemFactory} +import scala.collection.JavaConversions._ + +/** + * A mock implementation class that returns metadata for each stream that contains numTasks partitions in it. + */ +class MockSystemAdmin(numTasks: Int) extends SystemAdmin { + def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null + def getSystemStreamMetadata(streamNames: java.util.Set[String]) = { + streamNames.map(streamName => { + var partitionMetadata = (0 until numTasks).map(partitionId => { + new Partition(partitionId) -> new SystemStreamPartitionMetadata(null, null, null) + }).toMap + streamName -> new SystemStreamMetadata(streamName, partitionMetadata) + }).toMap[String, SystemStreamMetadata] + } + + override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) { + new UnsupportedOperationException("Method not implemented.") + } + + override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) { + new UnsupportedOperationException("Method not implemented.") + } + + override def createCoordinatorStream(streamName: String) { + new UnsupportedOperationException("Method not implemented.") + } + + override def offsetComparator(offset1: String, offset2: String) = null +} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemFactory.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemFactory.scala new file mode 100644 index 0000000..458400e --- /dev/null +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemFactory.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn + +import org.apache.samza.config.{JobConfig, Config} +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.system.SystemFactory + +/** + * A {@link org.apache.samza.system.SystemFactory} implementation that returns a {@link org.apache.samza.job.yarn.MockSystemAdmin}. + */ + +class MockSystemFactory extends SystemFactory { + def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = { + throw new RuntimeException("Hmm. Not implemented.") + } + + def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = { + throw new RuntimeException("Hmm. Not implemented.") + } + + def getAdmin(systemName: String, config: Config) = { + val jobConfig = new JobConfig(config) + new MockSystemAdmin(jobConfig.getContainerCount) + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala deleted file mode 100644 index 3f056c4..0000000 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn - -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse - -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException -import org.apache.samza.job.yarn.util.{TestUtil, TestAMRMClientImpl} -import org.junit.Test -import org.junit.Assert._ - - -class TestSamzaAppMaster { - @Test - def testAppMasterShouldShutdown { - val amClient = TestUtil.getAMClient( - new TestAMRMClientImpl( - TestUtil.getAppMasterResponse( - false, - new java.util.ArrayList[Container](), - new java.util.ArrayList[ContainerStatus]()) - )) - val listener = new YarnAppMasterListener { - var init = 0 - var shutdown = 0 - var allocated = 0 - var complete = 0 - override def shouldShutdown = true - override def onInit() { - init += 1 - } - override def onShutdown() { - shutdown += 1 - } - override def onContainerAllocated(container: Container) { - allocated += 1 - } - override def onContainerCompleted(containerStatus: ContainerStatus) { - complete += 1 - } - } - SamzaAppMaster.listeners = List(listener) - SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1) - assertEquals(1, listener.init) - assertEquals(1, listener.shutdown) - } - - @Test - def testAppMasterShouldShutdownWithFailingListener { - val amClient = TestUtil.getAMClient( - new TestAMRMClientImpl( - TestUtil.getAppMasterResponse( - false, - new java.util.ArrayList[Container](), - new java.util.ArrayList[ContainerStatus]()))) - val listener1 = new YarnAppMasterListener { - var shutdown = 0 - override def shouldShutdown = true - override def onShutdown() { - shutdown += 1 - throw new RuntimeException("Some weird failure") - } - } - val listener2 = new YarnAppMasterListener { - var shutdown = 0 - override def shouldShutdown = true - override def onShutdown() { - shutdown += 1 - } - } - // listener1 will throw an exception in shutdown, and listener2 should still get called - SamzaAppMaster.listeners = List(listener1, listener2) - SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1) - assertEquals(1, listener1.shutdown) - assertEquals(1, listener2.shutdown) - } - - @Test - def testAppMasterShouldShutdownWithInterrupt { - val amClient = TestUtil.getAMClient( - new TestAMRMClientImpl( - TestUtil.getAppMasterResponse( - false, - new java.util.ArrayList[Container](), - new java.util.ArrayList[ContainerStatus]()) - ) - ) - val listener = new YarnAppMasterListener { - var init = 0 - var shutdown = 0 - override def shouldShutdown = false - override def onInit() { - init += 1 - } - override def onShutdown() { - shutdown += 1 - } - } - val thread = new Thread { - override def run { - SamzaAppMaster.listeners = List(listener) - SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1) - } - } - thread.start - thread.interrupt - thread.join - assertEquals(1, listener.init) - assertEquals(1, listener.shutdown) - } - - @Test - def testAppMasterShouldForwardAllocatedAndCompleteContainers { - val amClient = TestUtil.getAMClient( - new TestAMRMClientImpl( - TestUtil.getAppMasterResponse( - false, - new java.util.ArrayList[Container]{ add(TestUtil.getContainer(null, "", 12345)); }, - new java.util.ArrayList[ContainerStatus]{ add(TestUtil.getContainerStatus(null, 1, null)); }) - ) - ) - val listener = new YarnAppMasterListener { - var allocated = 0 - var complete = 0 - override def onInit(): Unit = amClient.registerApplicationMaster("", -1, "") - override def shouldShutdown = (allocated >= 1 && complete >= 1) - override def onContainerAllocated(container: Container) { - allocated += 1 - } - override def onContainerCompleted(containerStatus: ContainerStatus) { - complete += 1 - } - } - SamzaAppMaster.listeners = List(listener) - SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1) - // heartbeat may be triggered for more than once - assertTrue(listener.allocated >= 1) - assertTrue(listener.complete >= 1) - } - - @Test - def testAppMasterShouldReboot { - val response: AllocateResponse = getAppMasterResponse( - true, - new java.util.ArrayList[Container](), - new java.util.ArrayList[ContainerStatus]()) - - val amClient = TestUtil.getAMClient( - new TestAMRMClientImpl(response)) - - val listener = new YarnAppMasterListener { - var reboot = 0 - override def onInit(): Unit = amClient.registerApplicationMaster("", -1, "") - override def shouldShutdown = reboot >= 1 - override def onReboot() { - reboot += 1 - } - } - SamzaAppMaster.listeners = List(listener) - SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1) - // heartbeat may be triggered for more than once - assertTrue(listener.reboot >= 1) - } - - /** - * This method is necessary because in Yarn 2.6, an RM reboot results in the allocate() method throwing an exception, - * rather than invoking AM_RESYNC command. However, we cannot mock out the AllocateResponse class in java because it - * will require the getAMCommand() signature to change and allow throwing an exception. This is however allowed in Scala. - * Since this is beyond our scope and we don't have a better way to mock the scenario for an RM reboot in our unit - * tests, we are keeping the following scala method for now. - */ - def getAppMasterResponse(reboot: Boolean, containers: java.util.List[Container], completed: java.util.List[ContainerStatus]) = - new AllocateResponse { - override def getResponseId() = 0 - override def setResponseId(responseId: Int) {} - override def getAllocatedContainers() = containers - override def setAllocatedContainers(containers: java.util.List[Container]) {} - override def getAvailableResources(): Resource = null - override def setAvailableResources(limit: Resource) {} - override def getCompletedContainersStatuses() = completed - override def setCompletedContainersStatuses(containers: java.util.List[ContainerStatus]) {} - override def setUpdatedNodes(nodes: java.util.List[NodeReport]) {} - override def getUpdatedNodes = new java.util.ArrayList[NodeReport]() - override def getNumClusterNodes = 1 - override def setNumClusterNodes(num: Int) {} - override def getNMTokens = new java.util.ArrayList[NMToken]() - override def setNMTokens(nmTokens: java.util.List[NMToken]) {} - override def setAMCommand(command: AMCommand) {} - override def getPreemptionMessage = null - override def setPreemptionMessage(request: PreemptionMessage) {} - override def getDecreasedContainers(): java.util.List[ContainerResourceDecrease] = java.util.Collections.emptyList[ContainerResourceDecrease] - override def getIncreasedContainers(): java.util.List[ContainerResourceIncrease] = java.util.Collections.emptyList[ContainerResourceIncrease] - override def setDecreasedContainers(decrease: java.util.List[ContainerResourceDecrease]): Unit = Unit - override def setIncreasedContainers(increase: java.util.List[ContainerResourceIncrease]): Unit = Unit - - override def getAMCommand = if (reboot) { - throw new ApplicationAttemptNotFoundException("Test - out of sync") - } else { - null - } - override def getAMRMToken: Token = null - override def setAMRMToken(amRMToken: Token): Unit = {} - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala deleted file mode 100644 index 750f467..0000000 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn - -import java.nio.ByteBuffer - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler -import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl -import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes -import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.samza.SamzaException -import org.junit.Assert._ -import org.junit.Test -import org.mockito.Mockito -import java.net.URL -import org.apache.samza.coordinator.JobModelManager - -class TestSamzaAppMasterLifecycle { - val coordinator = new JobModelManager(null, null) - val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, Mockito.mock(classOf[CallbackHandler])) { - var host = "" - var port = 0 - var status: FinalApplicationStatus = null - override def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = { - this.host = appHostName - this.port = appHostPort - new RegisterApplicationMasterResponse { - override def setApplicationACLs(map: java.util.Map[ApplicationAccessType, String]): Unit = () - override def getApplicationACLs = null - override def setMaximumResourceCapability(r: Resource): Unit = () - override def getMaximumResourceCapability = new Resource { - def getMemory = 512 - def getVirtualCores = 2 - def setMemory(memory: Int) {} - def setVirtualCores(vCores: Int) {} - def compareTo(o: Resource) = 0 - } - override def getClientToAMTokenMasterKey = null - override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {} - override def getContainersFromPreviousAttempts(): java.util.List[Container] = java.util.Collections.emptyList[Container] - override def getNMTokensFromPreviousAttempts(): java.util.List[NMToken] = java.util.Collections.emptyList[NMToken] - override def getQueue(): String = null - override def setContainersFromPreviousAttempts(containers: java.util.List[Container]): Unit = Unit - override def setNMTokensFromPreviousAttempts(nmTokens: java.util.List[NMToken]): Unit = Unit - override def setQueue(queue: String): Unit = Unit - - override def setSchedulerResourceTypes(types: java.util.EnumSet[SchedulerResourceTypes]): Unit = {} - override def getSchedulerResourceTypes: java.util.EnumSet[SchedulerResourceTypes] = null - } - } - override def unregisterApplicationMaster(appStatus: FinalApplicationStatus, - appMessage: String, - appTrackingUrl: String) { - this.status = appStatus - } - override def releaseAssignedContainer(containerId: ContainerId) {} - override def getClusterNodeCount() = 1 - - override def serviceInit(config: Configuration) {} - override def serviceStart() {} - override def serviceStop() {} - } - - @Test - def testLifecycleShouldRegisterOnInit { - val state = new SamzaAppState(coordinator, -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2) - state.rpcUrl = new URL("http://localhost:1") - state.trackingUrl = new URL("http://localhost:2") - val saml = new SamzaAppMasterLifecycle(512, 2, state, amClient) - saml.onInit - assertEquals("test", amClient.host) - assertEquals(1, amClient.port) - assertFalse(saml.shouldShutdown) - } - - @Test - def testLifecycleShouldUnregisterOnShutdown { - val state = new SamzaAppState(coordinator, -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2) - state.status = FinalApplicationStatus.SUCCEEDED - new SamzaAppMasterLifecycle(128, 1, state, amClient).onShutdown - assertEquals(FinalApplicationStatus.SUCCEEDED, amClient.status) - } - - @Test - def testLifecycleShouldThrowAnExceptionOnReboot { - var gotException = false - try { - new SamzaAppMasterLifecycle(368, 1, null, amClient).onReboot - } catch { - // expected - case e: SamzaException => gotException = true - } - assertTrue(gotException) - } - - @Test - def testLifecycleShouldShutdownOnInvalidContainerSettings { - val state = new SamzaAppState(coordinator, -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2) - state.rpcUrl = new URL("http://localhost:1") - state.trackingUrl = new URL("http://localhost:2") - List(new SamzaAppMasterLifecycle(768, 1, state, amClient), - new SamzaAppMasterLifecycle(368, 3, state, amClient)).map(saml => { - saml.onInit - assertTrue(saml.shouldShutdown) - }) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala deleted file mode 100644 index 3de5614..0000000 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn - -import java.io.BufferedReader -import java.net.URL -import java.io.InputStreamReader -import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.samza.Partition -import org.apache.samza.config.MapConfig -import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, SystemAdmin, SystemFactory} -import org.junit.Assert._ -import org.junit.Test -import scala.collection.JavaConversions._ -import org.apache.samza.config.Config -import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.container.TaskName -import org.apache.samza.coordinator.JobModelManager -import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory - -class TestSamzaAppMasterService { - - @Test - def testAppMasterDashboardShouldStart { - val config = getDummyConfig - val state = new SamzaAppState(JobModelManager(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2) - val service = new SamzaAppMasterService(config, state, null, null, null) - val taskName = new TaskName("test") - - // start the dashboard - service.onInit - assertTrue(state.rpcUrl.getPort > 0) - assertTrue(state.trackingUrl.getPort > 0) - assertTrue(state.coordinatorUrl.getPort > 0) - - // check to see if it's running - val url = new URL(state.rpcUrl.toString + "am") - val is = url.openConnection().getInputStream(); - val reader = new BufferedReader(new InputStreamReader(is)); - var line: String = null; - - do { - line = reader.readLine() - } while (line != null) - - reader.close(); - } - - /** - * This tests the rendering of the index.scaml file containing some Scala code. The objective - * is to ensure that the rendered scala code builds correctly - */ - @Test - def testAppMasterDashboardWebServiceShouldStart { - // Create some dummy config - val config = getDummyConfig - val state = new SamzaAppState(JobModelManager(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2) - val service = new SamzaAppMasterService(config, state, null, null, null) - - // start the dashboard - service.onInit - assertTrue(state.rpcUrl.getPort > 0) - assertTrue(state.trackingUrl.getPort > 0) - - // Do a GET Request on the tracking port: This in turn will render index.scaml - val url = state.trackingUrl - val is = url.openConnection().getInputStream() - val reader = new BufferedReader(new InputStreamReader(is)) - var line: String = null - - do { - line = reader.readLine() - } while (line != null) - - reader.close - } - - private def getDummyConfig: Config = new MapConfig(Map[String, String]( - "job.name" -> "test", - "yarn.container.count" -> "1", - "systems.test-system.samza.factory" -> "org.apache.samza.job.yarn.MockSystemFactory", - "yarn.container.memory.mb" -> "512", - "yarn.package.path" -> "/foo", - "task.inputs" -> "test-system.test-stream", - "systems.test-system.samza.key.serde" -> "org.apache.samza.serializers.JsonSerde", - "systems.test-system.samza.msg.serde" -> "org.apache.samza.serializers.JsonSerde", - "yarn.container.retry.count" -> "1", - "yarn.container.retry.window.ms" -> "1999999999", - "job.coordinator.system" -> "coordinator", - "systems.coordinator.samza.factory" -> classOf[MockCoordinatorStreamSystemFactory].getCanonicalName)) -} - -class MockSystemFactory extends SystemFactory { - def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = { - throw new RuntimeException("Hmm. Not implemented.") - } - - def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = { - throw new RuntimeException("Hmm. Not implemented.") - } - - def getAdmin(systemName: String, config: Config) = { - new MockSystemAdmin(config.getContainerCount) - } -} - -/** - * Helper class that returns metadata for each stream that contains numTasks partitions in it. - */ -class MockSystemAdmin(numTasks: Int) extends SystemAdmin { - def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null - def getSystemStreamMetadata(streamNames: java.util.Set[String]) = { - streamNames.map(streamName => { - var partitionMetadata = (0 until numTasks).map(partitionId => { - new Partition(partitionId) -> new SystemStreamPartitionMetadata(null, null, null) - }).toMap - streamName -> new SystemStreamMetadata(streamName, partitionMetadata) - }).toMap[String, SystemStreamMetadata] - } - - override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) { - new UnsupportedOperationException("Method not implemented.") - } - - override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) { - new UnsupportedOperationException("Method not implemented.") - } - - override def createCoordinatorStream(streamName: String) { - new UnsupportedOperationException("Method not implemented.") - } - - override def offsetComparator(offset1: String, offset2: String) = null -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala new file mode 100644 index 0000000..2664e41 --- /dev/null +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn + +import java.net.URL +import java.nio.ByteBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes +import org.apache.hadoop.yarn.util.ConverterUtils +import org.apache.samza.SamzaException +import org.apache.samza.clustermanager.SamzaApplicationState +import org.apache.samza.clustermanager.SamzaApplicationState.SamzaAppStatus +import org.apache.samza.coordinator.JobModelManager +import org.junit.Assert._ +import org.junit.Test +import org.mockito.Mockito + +class TestSamzaYarnAppMasterLifecycle { + val coordinator = new JobModelManager(null, null) + val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, Mockito.mock(classOf[CallbackHandler])) { + var host = "" + var port = 0 + var status: FinalApplicationStatus = null + override def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = { + this.host = appHostName + this.port = appHostPort + new RegisterApplicationMasterResponse { + override def setApplicationACLs(map: java.util.Map[ApplicationAccessType, String]): Unit = () + override def getApplicationACLs = null + override def setMaximumResourceCapability(r: Resource): Unit = () + override def getMaximumResourceCapability = new Resource { + def getMemory = 512 + def getVirtualCores = 2 + def setMemory(memory: Int) {} + def setVirtualCores(vCores: Int) {} + def compareTo(o: Resource) = 0 + } + override def getClientToAMTokenMasterKey = null + override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {} + override def getContainersFromPreviousAttempts(): java.util.List[Container] = java.util.Collections.emptyList[Container] + override def getNMTokensFromPreviousAttempts(): java.util.List[NMToken] = java.util.Collections.emptyList[NMToken] + override def getQueue(): String = null + override def setContainersFromPreviousAttempts(containers: java.util.List[Container]): Unit = Unit + override def setNMTokensFromPreviousAttempts(nmTokens: java.util.List[NMToken]): Unit = Unit + override def setQueue(queue: String): Unit = Unit + + override def setSchedulerResourceTypes(types: java.util.EnumSet[SchedulerResourceTypes]): Unit = {} + override def getSchedulerResourceTypes: java.util.EnumSet[SchedulerResourceTypes] = null + } + } + override def unregisterApplicationMaster(appStatus: FinalApplicationStatus, + appMessage: String, + appTrackingUrl: String) { + this.status = appStatus + } + override def releaseAssignedContainer(containerId: ContainerId) {} + override def getClusterNodeCount() = 1 + + override def serviceInit(config: Configuration) {} + override def serviceStart() {} + override def serviceStop() {} + } + + @Test + def testLifecycleShouldRegisterOnInit { + val state = new SamzaApplicationState(coordinator) + + val yarnState = new YarnAppState(1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 2); + yarnState.rpcUrl = new URL("http://localhost:1") + yarnState.trackingUrl = new URL("http://localhost:2") + + val saml = new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, amClient) + saml.onInit + assertEquals("testHost", amClient.host) + assertEquals(1, amClient.port) + assertFalse(saml.shouldShutdown) + } + + @Test + def testLifecycleShouldUnregisterOnShutdown { + val state = new SamzaApplicationState(coordinator) + + val yarnState = new YarnAppState(1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 2); + new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, amClient).onShutdown (SamzaAppStatus.SUCCEEDED) + assertEquals(FinalApplicationStatus.SUCCEEDED, amClient.status) + } + + @Test + def testLifecycleShouldThrowAnExceptionOnReboot { + var gotException = false + try { + val state = new SamzaApplicationState(coordinator) + + val yarnState = new YarnAppState(1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 2); + new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, amClient).onReboot() + } catch { + // expected + case e: SamzaException => gotException = true + } + assertTrue(gotException) + } + + @Test + def testLifecycleShouldShutdownOnInvalidContainerSettings { + val state = new SamzaApplicationState(coordinator) + + val yarnState = new YarnAppState(1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 2); + yarnState.rpcUrl = new URL("http://localhost:1") + yarnState.trackingUrl = new URL("http://localhost:2") + + //Request a higher amount of memory from yarn. + List(new SamzaYarnAppMasterLifecycle(768, 1, state, yarnState, amClient), + //Request a higher number of cores from yarn. + new SamzaYarnAppMasterLifecycle(368, 3, state, yarnState, amClient)).map(saml => { + saml.onInit + assertTrue(saml.shouldShutdown) + }) + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala new file mode 100644 index 0000000..1dd0c18 --- /dev/null +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn + +import java.io.BufferedReader +import java.net.URL +import java.io.InputStreamReader +import org.apache.hadoop.yarn.util.ConverterUtils +import org.apache.samza.Partition +import org.apache.samza.clustermanager.SamzaApplicationState +import org.apache.samza.config.MapConfig +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, SystemAdmin, SystemFactory} +import org.junit.Assert._ +import org.junit.Test +import scala.collection.JavaConversions._ +import org.apache.samza.config.Config +import org.apache.samza.config.JobConfig.Config2Job +import org.apache.samza.container.TaskName +import org.apache.samza.coordinator.JobModelManager +import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory + +class TestSamzaYarnAppMasterService { + + @Test + def testAppMasterDashboardShouldStart { + val config = getDummyConfig + val jobModelManager = JobModelManager(config) + val samzaState = new SamzaApplicationState(jobModelManager) + + val state = new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "testHost", 1, 1); + val service = new SamzaYarnAppMasterService(config, samzaState, state, null, null) + val taskName = new TaskName("test") + + // start the dashboard + service.onInit + assertTrue(state.rpcUrl.getPort > 0) + assertTrue(state.trackingUrl.getPort > 0) + assertTrue(state.coordinatorUrl.getPort > 0) + + // check to see if it's running + val url = new URL(state.rpcUrl.toString + "am") + val is = url.openConnection().getInputStream(); + val reader = new BufferedReader(new InputStreamReader(is)); + var line: String = null; + + do { + line = reader.readLine() + } while (line != null) + + reader.close(); + } + + /** + * This tests the rendering of the index.scaml file containing some Scala code. The objective + * is to ensure that the rendered scala code builds correctly + */ + @Test + def testAppMasterDashboardWebServiceShouldStart { + // Create some dummy config + val config = getDummyConfig + val jobModelManager = JobModelManager(config) + val samzaState = new SamzaApplicationState(jobModelManager) + val state = new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "testHost", 1, 1); + + val service = new SamzaYarnAppMasterService(config, samzaState, state, null, null) + + // start the dashboard + service.onInit + assertTrue(state.rpcUrl.getPort > 0) + assertTrue(state.trackingUrl.getPort > 0) + + // Do a GET Request on the tracking port: This in turn will render index.scaml + val url = state.trackingUrl + val is = url.openConnection().getInputStream() + val reader = new BufferedReader(new InputStreamReader(is)) + var line: String = null + + do { + line = reader.readLine() + } while (line != null) + + reader.close + } + + private def getDummyConfig: Config = new MapConfig(Map[String, String]( + "job.name" -> "test", + "yarn.container.count" -> "1", + "systems.test-system.samza.factory" -> "org.apache.samza.job.yarn.MockSystemFactory", + "yarn.container.memory.mb" -> "512", + "yarn.package.path" -> "/foo", + "task.inputs" -> "test-system.test-stream", + "systems.test-system.samza.key.serde" -> "org.apache.samza.serializers.JsonSerde", + "systems.test-system.samza.msg.serde" -> "org.apache.samza.serializers.JsonSerde", + "yarn.container.retry.count" -> "1", + "yarn.container.retry.window.ms" -> "1999999999", + "job.coordinator.system" -> "coordinator", + "systems.coordinator.samza.factory" -> classOf[MockCoordinatorStreamSystemFactory].getCanonicalName)) +} + + + +
