Repository: samza Updated Branches: refs/heads/master 9f7abf535 -> 947472a0b
http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java new file mode 100644 index 0000000..57fef12 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.server.HttpServer; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestHostAwareContainerAllocator { + + private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + private final MockClusterResourceManager manager = new MockClusterResourceManager(callback); + private final Config config = getConfig(); + private final JobModelManager reader = getJobModelManager(1); + private final SamzaApplicationState state = new SamzaApplicationState(reader); + private HostAwareContainerAllocator containerAllocator; + private final int timeoutMillis = 1000; + private MockContainerRequestState requestState; + private Thread allocatorThread; + + @Before + public void setup() throws Exception { + containerAllocator = new HostAwareContainerAllocator(manager, timeoutMillis, config, state); + requestState = new MockContainerRequestState(manager, true); + Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("resourceRequestState"); + requestStateField.setAccessible(true); + requestStateField.set(containerAllocator, requestState); + allocatorThread = new Thread(containerAllocator); + } + + + /** + * Test request containers with no containerToHostMapping makes the right number of requests + */ + @Test + public void testRequestContainersWithNoMapping() throws Exception { + int containerCount = 4; + Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>(); + for (int i = 0; i < containerCount; i++) { + containersToHostMapping.put(i, null); + } + + allocatorThread.start(); + + containerAllocator.requestResources(containersToHostMapping); + + assertNotNull(requestState); + + assertEquals(4, requestState.numPendingRequests()); + + assertNotNull(requestState.getRequestsToCountMap()); + assertEquals(1, requestState.getRequestsToCountMap().keySet().size()); + assertTrue(requestState.getRequestsToCountMap().keySet().contains(ResourceRequestState.ANY_HOST)); + } + + /** + * Add containers to the correct host in the request state + */ + @Test + public void testAddContainerWithHostAffinity() throws Exception { + containerAllocator.requestResources(new HashMap<Integer, String>() { + { + put(0, "abc"); + put(1, "xyz"); + } + }); + + assertNotNull(requestState.getResourcesOnAHost("abc")); + assertEquals(0, requestState.getResourcesOnAHost("abc").size()); + + assertNotNull(requestState.getResourcesOnAHost("xyz")); + assertEquals(0, requestState.getResourcesOnAHost("xyz").size()); + + assertNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST)); + + containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID1")); + containerAllocator.addResource(new SamzaResource(1, 10, "def", "ID2")); + containerAllocator.addResource(new SamzaResource(1, 10, "xyz", "ID3")); + + + assertNotNull(requestState.getResourcesOnAHost("abc")); + assertEquals(1, requestState.getResourcesOnAHost("abc").size()); + + assertNotNull(requestState.getResourcesOnAHost("xyz")); + assertEquals(1, requestState.getResourcesOnAHost("xyz").size()); + + assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST)); + assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size() == 1); + assertEquals("ID2", requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).get(0).getResourceID()); + } + + @Test + public void testAllocatorReleasesExtraContainers() throws Exception { + final SamzaResource resource0 = new SamzaResource(1, 1024, "abc", "id1"); + final SamzaResource resource1 = new SamzaResource(1, 1024, "abc", "id2"); + final SamzaResource resource2 = new SamzaResource(1, 1024, "def", "id3"); + + Runnable releasedAssertions = new Runnable() { + @Override + public void run() { + assertEquals(2, manager.releasedResources.size()); + assertTrue(manager.releasedResources.contains(resource1)); + assertTrue(manager.releasedResources.contains(resource2)); + + // Test that state is cleaned up + assertEquals(0, requestState.numPendingRequests()); + assertEquals(0, requestState.getRequestsToCountMap().size()); + assertNull(requestState.getResourcesOnAHost("abc")); + assertNull(requestState.getResourcesOnAHost("def")); + } + }; + // Set up our final asserts before starting the allocator thread + MockContainerListener listener = new MockContainerListener(3, 2, 0, 0, null, releasedAssertions, + null, null); + requestState.registerContainerListener(listener); + + allocatorThread.start(); + + containerAllocator.requestResource(0, "abc"); + + containerAllocator.addResource(resource0); + containerAllocator.addResource(resource1); + containerAllocator.addResource(resource2); + + listener.verify(); + } + + + + + @Test + public void testRequestContainers() throws Exception { + Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() { + { + put(0, "abc"); + put(1, "def"); + put(2, null); + put(3, "abc"); + } + }; + + containerAllocator.requestResources(containersToHostMapping); + + assertNotNull(manager.resourceRequests); + assertEquals(manager.resourceRequests.size(), 4); + assertEquals(requestState.numPendingRequests(), 4); + + Map<String, AtomicInteger> requestsMap = requestState.getRequestsToCountMap(); + assertNotNull(requestsMap.get("abc")); + assertEquals(2, requestsMap.get("abc").get()); + + assertNotNull(requestsMap.get("def")); + assertEquals(1, requestsMap.get("def").get()); + + assertNotNull(requestsMap.get(ResourceRequestState.ANY_HOST)); + assertEquals(1, requestsMap.get(ResourceRequestState.ANY_HOST).get()); + } + + /** + * If the container fails to start e.g because it fails to connect to a NM on a host that + * is down, the allocator should request a new container on a different host. + */ + @Test + public void testRerequestOnAnyHostIfContainerStartFails() throws Exception { + + final SamzaResource container = new SamzaResource(1, 1024, "2", "id0"); + final SamzaResource container1 = new SamzaResource(1, 1024, "1", "id1"); + manager.nextException = new IOException("Cant connect to RM"); + + // Set up our final asserts before starting the allocator thread + MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, null, new Runnable() { + @Override + public void run() { + // The failed container should be released. The successful one should not. + assertNotNull(manager.releasedResources); + assertEquals(1, manager.releasedResources.size()); + assertTrue(manager.releasedResources.contains(container)); + } + }, + new Runnable() { + @Override + public void run() { + // Test that the first request assignment had a preferred host and the retry didn't + assertEquals(2, requestState.assignedRequests.size()); + + SamzaResourceRequest request = requestState.assignedRequests.remove(); + assertEquals(0, request.getContainerID()); + assertEquals("2", request.getPreferredHost()); + + request = requestState.assignedRequests.remove(); + assertEquals(0, request.getContainerID()); + assertEquals("ANY_HOST", request.getPreferredHost()); + + // This routine should be called after the retry is assigned, but before it's started. + // So there should still be 1 container needed. + assertEquals(1, state.neededResources.get()); + } + }, null + ); + state.neededResources.set(1); + requestState.registerContainerListener(listener); + + // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry) + containerAllocator.requestResource(0, "2"); + containerAllocator.addResource(container1); + containerAllocator.addResource(container); + + allocatorThread.start(); + + listener.verify(); + } + + + /** + * Handles expired requests correctly and assigns ANY_HOST + */ + + @Test + public void testExpiredRequestHandling() throws Exception { + final SamzaResource resource0 = new SamzaResource(1, 1000, "xyz", "id1"); + final SamzaResource resource1 = new SamzaResource(1, 1000, "zzz", "id2"); + + Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() { + { + put(0, "abc"); + put(1, "def"); + } + }; + containerAllocator.requestResources(containersToHostMapping); + assertEquals(requestState.numPendingRequests(), 2); + assertNotNull(requestState.getRequestsToCountMap()); + assertNotNull(requestState.getRequestsToCountMap().get("abc")); + assertTrue(requestState.getRequestsToCountMap().get("abc").get() == 1); + + assertNotNull(requestState.getRequestsToCountMap().get("def")); + assertTrue(requestState.getRequestsToCountMap().get("def").get() == 1); + + Runnable addContainerAssertions = new Runnable() { + @Override + public void run() { + assertNull(requestState.getResourcesOnAHost("xyz")); + assertNull(requestState.getResourcesOnAHost("zzz")); + assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST)); + assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size() == 2); + } + }; + + Runnable assignContainerAssertions = new Runnable() { + @Override + public void run() { + assertEquals(requestState.numPendingRequests(), 0); + assertNotNull(requestState.getRequestsToCountMap()); + assertNotNull(requestState.getRequestsToCountMap().get("abc")); + assertNotNull(requestState.getRequestsToCountMap().get("def")); + } + }; + + Runnable runningContainerAssertions = new Runnable() { + @Override + public void run() { + assertTrue(manager.launchedResources.contains(resource0)); + assertTrue(manager.launchedResources.contains(resource1)); + } + }; + MockContainerListener listener = new MockContainerListener(2, 0, 2, 2, addContainerAssertions, null, assignContainerAssertions, runningContainerAssertions); + requestState.registerContainerListener(listener); + ((MockClusterResourceManager) manager).registerContainerListener(listener); + containerAllocator.addResource(resource0); + containerAllocator.addResource(resource1); + allocatorThread.start(); + + listener.verify(); + } + + @After + public void teardown() throws Exception { + reader.stop(); + containerAllocator.stop(); + } + + + private static Config getConfig() { + Config config = new MapConfig(new HashMap<String, String>() { + { + put("yarn.container.count", "1"); + put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); + put("yarn.container.memory.mb", "512"); + put("yarn.package.path", "/foo"); + put("task.inputs", "test-system.test-stream"); + put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde"); + put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); + put("yarn.container.retry.count", "1"); + put("yarn.container.retry.window.ms", "1999999999"); + put("yarn.samza.host-affinity.enabled", "true"); + put("yarn.container.request.timeout.ms", "3"); + put("yarn.allocator.sleep.ms", "1"); + } + }); + + Map<String, String> map = new HashMap<>(); + map.putAll(config); + return new MapConfig(map); + } + + private static JobModelManager getJobModelManager(int containerCount) { + //Ideally, the JobModelReader should be constructed independent of HttpServer. + //That way it becomes easier to mock objects. Save it for later. + + HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); + Map<Integer, ContainerModel> containers = new java.util.HashMap<>(); + for (int i = 0; i < containerCount; i++) { + ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>()); + containers.put(i, container); + } + JobModel jobModel = new JobModel(getConfig(), containers); + return new JobModelManager(jobModel, server, null); + } + + +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 9df12d2..1358fdd 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -31,7 +31,7 @@ import scala.collection.JavaConversions._ import org.apache.samza.Partition import org.apache.samza.config.Config import org.apache.samza.config.MapConfig -import org.apache.samza.coordinator.JobCoordinator +import org.apache.samza.coordinator.JobModelManager import org.apache.samza.coordinator.server.{ServletBase, HttpServer, JobServlet} import org.apache.samza.job.model.ContainerModel import org.apache.samza.job.model.JobModel @@ -76,9 +76,9 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { val jobModel = new JobModel(config, containers) def jobModelGenerator(): JobModel = jobModel val server = new HttpServer - val coordinator = new JobCoordinator(jobModel, server) - JobCoordinator.jobModelRef.set(jobModelGenerator()) - coordinator.server.addServlet("/*", new JobServlet(JobCoordinator.jobModelRef)) + val coordinator = new JobModelManager(jobModel, server) + JobModelManager.jobModelRef.set(jobModelGenerator()) + coordinator.server.addServlet("/*", new JobServlet(JobModelManager.jobModelRef)) try { coordinator.start assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString)) @@ -101,9 +101,9 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { val jobModel = new JobModel(config, containers) def jobModelGenerator(): JobModel = jobModel val server = new HttpServer - val coordinator = new JobCoordinator(jobModel, server) - JobCoordinator.jobModelRef.set(jobModelGenerator()) - val mockJobServlet = new MockJobServlet(2, JobCoordinator.jobModelRef) + val coordinator = new JobModelManager(jobModel, server) + JobModelManager.jobModelRef.set(jobModelGenerator()) + val mockJobServlet = new MockJobServlet(2, JobModelManager.jobModelRef) coordinator.server.addServlet("/*", mockJobServlet) try { coordinator.start http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index 110c3a9..ffdb006 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -98,12 +98,12 @@ class TestJobCoordinator { // We want the mocksystemconsumer to use the same instance across runs MockCoordinatorStreamSystemFactory.enableMockConsumerCache() - val coordinator = JobCoordinator(new MapConfig(config ++ otherConfigs)) + val coordinator = JobModelManager(new MapConfig(config ++ otherConfigs)) val expectedJobModel = new JobModel(new MapConfig(config), containers) // Verify that the atomicReference is initialized - assertNotNull(JobCoordinator.jobModelRef.get()) - assertEquals(expectedJobModel, JobCoordinator.jobModelRef.get()) + assertNotNull(JobModelManager.jobModelRef.get()) + assertEquals(expectedJobModel, JobModelManager.jobModelRef.get()) coordinator.start assertEquals(new MapConfig(config), coordinator.jobModel.getConfig) @@ -123,7 +123,6 @@ class TestJobCoordinator { @Test def testJobCoordinatorChangelogPartitionMapping = { - System.out.println("test ") val task0Name = new TaskName("Partition 0") val ssp0 = Set(new SystemStreamPartition("test", "stream1", new Partition(0))) val task1Name = new TaskName("Partition 1") @@ -165,7 +164,7 @@ class TestJobCoordinator { MockCoordinatorStreamSystemFactory.enableMockConsumerCache() // start the job coordinator and verify if it has all the checkpoints through http port - val coordinator = JobCoordinator(new MapConfig(config ++ otherConfigs)) + val coordinator = JobModelManager(new MapConfig(config ++ otherConfigs)) coordinator.start val url = coordinator.server.getUrl.toString http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala index 3a710a8..a55af50 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala @@ -19,7 +19,7 @@ package org.apache.samza.job.local; -import org.apache.samza.coordinator.JobCoordinator +import org.apache.samza.coordinator.JobModelManager import org.junit.Assert._ import org.junit.Test import org.apache.samza.job.ApplicationStatus @@ -33,7 +33,7 @@ class TestProcessJob { override def buildCommand = "sleep 1" override def buildEnvironment = Map[String, String]() } - val coordinator = new MockJobCoordinator() + val coordinator = new MockJobModelManager() val job = new ProcessJob(commandBuilder, coordinator) job.submit job.waitForFinish(999999) @@ -45,7 +45,7 @@ class TestProcessJob { override def buildCommand = "sleep 999999999" override def buildEnvironment = Map[String, String]() } - val coordinator = new MockJobCoordinator() + val coordinator = new MockJobModelManager() val job = new ProcessJob(commandBuilder, coordinator) job.submit job.waitForFinish(500) @@ -56,7 +56,7 @@ class TestProcessJob { } } -class MockJobCoordinator extends JobCoordinator(null, null) { +class MockJobModelManager extends JobModelManager(null, null) { var stopped: Boolean = false override def start: Unit = { } http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java index 0c6329e..ca4eb7f 100644 --- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java @@ -32,7 +32,7 @@ import org.apache.samza.config.Config; import org.apache.samza.config.Log4jSystemConfig; import org.apache.samza.config.SerializerConfig; import org.apache.samza.config.ShellCommandConfig; -import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.job.model.JobModel; import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory; import org.apache.samza.metrics.MetricsRegistryMap; @@ -103,7 +103,7 @@ public class StreamAppender extends AppenderSkeleton { try { recursiveCall.set(true); if (!systemInitialized) { - if (JobCoordinator.currentJobCoordinator() != null) { + if (JobModelManager.currentJobModelManager() != null) { // JobCoordinator has been instantiated setupSystem(); systemInitialized = true; @@ -173,7 +173,7 @@ public class StreamAppender extends AppenderSkeleton { try { if (isApplicationMaster) { - config = JobCoordinator.currentJobCoordinator().jobModel().getConfig(); + config = JobModelManager.currentJobModelManager().jobModel().getConfig(); } else { String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL()); config = SamzaObjectMapper.getObjectMapper() http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-shell/src/main/bash/run-am.sh ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/bash/run-am.sh b/samza-shell/src/main/bash/run-am.sh index 9545a96..ca938cc 100755 --- a/samza-shell/src/main/bash/run-am.sh +++ b/samza-shell/src/main/bash/run-am.sh @@ -22,4 +22,4 @@ # Set container name system properties for use in Log4J [[ $JAVA_OPTS != *-Dsamza.container.name* ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.name=samza-application-master" -exec $(dirname $0)/run-class.sh org.apache.samza.job.yarn.SamzaAppMaster "$@" +exec $(dirname $0)/run-class.sh org.apache.samza.job.yarn.SamzaAppMaster "$@" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-shell/src/main/bash/run-jc.sh ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/bash/run-jc.sh b/samza-shell/src/main/bash/run-jc.sh new file mode 100644 index 0000000..cf3e734 --- /dev/null +++ b/samza-shell/src/main/bash/run-jc.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Check if server is set. If not - set server optimization +[[ $JAVA_OPTS != *-server* ]] && export JAVA_OPTS="$JAVA_OPTS -server" + +# Set container name system properties for use in Log4J +[[ $JAVA_OPTS != *-Dsamza.container.name* ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.name=samza-job-coordinator" + +exec $(dirname $0)/run-class.sh org.apache.samza.clustermanager.ClusterBasedJobCoordinator "$@" http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java index 77280ba..c116ed8 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java @@ -23,7 +23,7 @@ import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobModelManager; import org.apache.hadoop.yarn.api.records.ContainerStatus; import java.net.URL; @@ -45,7 +45,7 @@ public class SamzaAppState { * lifecycle. It helps querying JobModel related info in {@link org.apache.samza.webapp.ApplicationMasterRestServlet} * and locality information when host-affinity is enabled in {@link org.apache.samza.job.yarn.SamzaTaskManager} */ - public final JobCoordinator jobCoordinator; + public final JobModelManager jobCoordinator; /* The following state variables are primarily used for reference in the AM web services */ /** @@ -168,13 +168,13 @@ public class SamzaAppState { public AtomicInteger matchedContainerRequests = new AtomicInteger(0); - public SamzaAppState(JobCoordinator jobCoordinator, + public SamzaAppState(JobModelManager jobModelManager, int taskId, ContainerId amContainerId, String nodeHost, int nodePort, int nodeHttpPort) { - this.jobCoordinator = jobCoordinator; + this.jobCoordinator = jobModelManager; this.taskId = taskId; this.amContainerId = amContainerId; this.nodeHost = nodeHost; http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java index caee6e6..bc95f31 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java @@ -139,7 +139,7 @@ class SamzaTaskManager implements YarnAppMasterListener { } /** - * This methods handles the onContainerCompleted callback from the RM. Based on the ContainerExitStatus, it decides + * This methods handles the onResourceCompleted callback from the RM. Based on the ContainerExitStatus, it decides * whether a container that exited is marked as complete or failure. */ @Override http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java new file mode 100644 index 0000000..57092e1 --- /dev/null +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.samza.clustermanager.SamzaApplicationState; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.job.yarn.YarnContainer; + +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * YarnAppState encapsulates Yarn specific state variables that are Yarn specific. This class + * is useful for information to display in the UI. + * + * TODO: make these variables private, provide thread-safe accessors. + * Saving making changes to variables in YarnAppState because it is used by the UI, and changes to + * variable names, data structure etc. will require changes to the UI scaml templates too. This is tracked + * as a part of SAMZA-902 + */ + +public class YarnAppState { + + /** + /** + * State indicating whether the job is healthy or not + * Modified by both the AMRMCallbackThread and the ContainerAllocator thread + */ + + public Map<Integer, YarnContainer> runningYarnContainers = new ConcurrentHashMap<Integer, YarnContainer>() ; + + public ConcurrentMap<String, ContainerStatus> failedContainersStatus = new ConcurrentHashMap<String, ContainerStatus>(); + + public YarnAppState(JobModelManager jobModelManager, + int taskId, + ContainerId amContainerId, + String nodeHost, + int nodePort, + int nodeHttpPort, + SamzaApplicationState state) { + this.jobModelManager = jobModelManager; + this.taskId = taskId; + this.amContainerId = amContainerId; + this.nodeHost = nodeHost; + this.nodePort = nodePort; + this.nodeHttpPort = nodeHttpPort; + this.appAttemptId = amContainerId.getApplicationAttemptId(); + this.samzaAppState = state; + } + + + @Override + public String toString() { + return "YarnAppState{" + + "samzaAppState=" + samzaAppState + + ", jobModelReader=" + jobModelManager + + ", taskId=" + taskId + + ", amContainerId=" + amContainerId + + ", nodeHost='" + nodeHost + '\'' + + ", nodePort=" + nodePort + + ", nodeHttpPort=" + nodeHttpPort + + ", appAttemptId=" + appAttemptId + + ", coordinatorUrl=" + coordinatorUrl + + ", rpcUrl=" + rpcUrl + + ", trackingUrl=" + trackingUrl + + ", runningYarnContainers=" + runningYarnContainers + + ", failedContainersStatus=" + failedContainersStatus + + '}'; + } + + public final SamzaApplicationState samzaAppState; + /* The following state variables are primarily used for reference in the AM web services */ + + /** + * Task Id of the AM + * Used for displaying in the AM UI. Usage found in {@link org.apache.samza.webapp.ApplicationMasterRestServlet} + * and scalate/WEB-INF/views/index.scaml + */ + public final JobModelManager jobModelManager; + + public final int taskId; + /** + * Id of the AM container (as allocated by the RM) + * Used for displaying in the AM UI. Usage in {@link org.apache.samza.webapp.ApplicationMasterRestServlet} + * and scalate/WEB-INF/views/index.scaml + */ + public final ContainerId amContainerId; + /** + * Host name of the NM on which the AM is running + * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml + */ + public final String nodeHost; + /** + * NM port on which the AM is running + * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml + */ + public final int nodePort; + /** + * Http port of the NM on which the AM is running + * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml + */ + public final int nodeHttpPort; + /** + * Application Attempt Id as provided by Yarn + * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml + * and {@link org.apache.samza.webapp.ApplicationMasterRestServlet} + */ + public final ApplicationAttemptId appAttemptId; + + //TODO: Make the below 3 variables immutable. Tracked as a part of SAMZA-902. Save for later. + /** + * Job Coordinator URL + * Usage in {@link org.apache.samza.job.yarn.SamzaAppMasterService} & YarnContainerRunner + */ + public URL coordinatorUrl = null; + /** + * URL of the {@link org.apache.samza.webapp.ApplicationMasterRestServlet} + */ + public URL rpcUrl = null; + /** + * URL of the {@link org.apache.samza.webapp.ApplicationMasterWebServlet} + */ + public URL trackingUrl = null; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java new file mode 100644 index 0000000..7778a38 --- /dev/null +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn; + +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.clustermanager.*; +import org.apache.samza.clustermanager.SamzaApplicationState; +import org.apache.samza.clustermanager.SamzaContainerLaunchException; +import org.apache.samza.config.Config; +import org.apache.samza.config.ShellCommandConfig; +import org.apache.samza.config.YarnConfig; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.job.CommandBuilder; +import org.apache.samza.job.yarn.YarnContainer; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.util.hadoop.HttpFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * + * An {@link YarnClusterResourceManager} implements a ClusterResourceManager using Yarn as the underlying + * resource manager. This class is as an adaptor between Yarn and translates Yarn callbacks into + * Samza specific callback methods as specified in Callback. + * + * Thread-safety: + * 1.Start and stop methods should NOT be called from multiple threads. + * 2.ALL callbacks from the YarnContainerManager are invoked from a single Callback thread of the AMRMClient. + * 3.Stop should not be called more than once. + * + */ + +public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler { + + private final int INVALID_YARN_CONTAINER_ID = -1; + + /** + * The containerProcessManager instance to request resources from yarn. + */ + private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient; + + /** + * A helper class to launch Yarn containers. + */ + private final YarnContainerRunner yarnContainerRunner; + + /** + * Configuration and state specific to Yarn. + */ + private final YarnConfiguration hConfig; + private final YarnAppState state; + + /** + * SamzaYarnAppMasterLifecycle is responsible for registering, unregistering the AM client. + */ + private final SamzaYarnAppMasterLifecycle lifecycle; + + /** + * SamzaAppMasterService is responsible for hosting an AM web UI. This picks up data from both + * SamzaAppState and YarnAppState. + */ + private final SamzaYarnAppMasterService service; + + + /** + * State variables to map Yarn specific callbacks into Samza specific callbacks. + */ + private final ConcurrentHashMap<SamzaResource, Container> allocatedResources = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap<>(); + + final AtomicBoolean started = new AtomicBoolean(false); + private final Object lock = new Object(); + + private static final Logger log = LoggerFactory.getLogger(YarnClusterResourceManager.class); + + /** + * Creates an YarnClusterResourceManager from config, a jobModelReader and a callback. + * @param config to instantiate the container manager with + * @param jobModelManager the jobModel manager to get the job model (mostly for the UI) + * @param callback the callback to receive events from Yarn. + * @param samzaAppState samza app state for display in the UI + */ + public YarnClusterResourceManager(Config config, JobModelManager jobModelManager, ClusterResourceManager.Callback callback, SamzaApplicationState samzaAppState ) { + super(callback); + hConfig = new YarnConfiguration(); + hConfig.set("fs.http.impl", HttpFileSystem.class.getName()); + + MetricsRegistryMap registry = new MetricsRegistryMap(); + + // parse configs from the Yarn environment + String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString()); + ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); + String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString()); + String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString()); + String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString()); + + int nodePort = Integer.parseInt(nodePortString); + int nodeHttpPort = Integer.parseInt(nodeHttpPortString); + YarnConfig yarnConfig = new YarnConfig(config); + int interval = yarnConfig.getAMPollIntervalMs(); + + //Instantiate the AM Client. + this.amClient = AMRMClientAsync.createAMRMClientAsync(interval, this); + + this.state = new YarnAppState(jobModelManager, -1, containerId, nodeHostString, nodePort, nodeHttpPort, samzaAppState); + + log.info("Initialized YarnAppState: {}", state.toString()); + this.service = new SamzaYarnAppMasterService(config, this.state, registry); + + log.info("ContainerID str {}, Nodehost {} , Nodeport {} , NodeHttpport {}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort}); + this.lifecycle = new SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), yarnConfig.getContainerMaxCpuCores(), state, amClient ); + + yarnContainerRunner = new YarnContainerRunner(config, hConfig); + } + + /** + * Starts the YarnContainerManager and initialize all its sub-systems. + * Attempting to start an already started container manager will return immediately. + */ + @Override + public void start() { + if(!started.compareAndSet(false, true)) { + log.info("Attempting to start an already started ContainerManager"); + return; + } + service.onInit(); + log.info("Starting YarnContainerManager."); + amClient.init(hConfig); + amClient.start(); + lifecycle.onInit(); + + if(lifecycle.shouldShutdown()) { + clusterManagerCallback.onError(new SamzaException("Invalid resource request.")); + } + + log.info("Finished starting YarnContainerManager"); + } + + /** + * Request resources for running container processes. + */ + @Override + public void requestResources(SamzaResourceRequest resourceRequest) { + final int DEFAULT_PRIORITY = 0; + log.info("Requesting resources on " + resourceRequest.getPreferredHost() + " for container " + resourceRequest.getContainerID()); + + int memoryMb = resourceRequest.getMemoryMB(); + int cpuCores = resourceRequest.getNumCores(); + String preferredHost = resourceRequest.getPreferredHost(); + Resource capability = Resource.newInstance(memoryMb, cpuCores); + Priority priority = Priority.newInstance(DEFAULT_PRIORITY); + + AMRMClient.ContainerRequest issuedRequest; + + if (preferredHost.equals("ANY_HOST")) + { + log.info("Making a request for ANY_HOST " + preferredHost ); + issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority); + } + else + { + log.info("Making a preferred host request on " + preferredHost); + issuedRequest = new AMRMClient.ContainerRequest( + capability, + new String[]{preferredHost}, + null, + priority); + } + //ensure that updating the state and making the request are done atomically. + synchronized (lock) { + requestsMap.put(resourceRequest, issuedRequest); + amClient.addContainerRequest(issuedRequest); + } + } + + /** + * Requests the YarnContainerManager to release a resource. If the app cannot use the resource or wants to give up + * the resource, it can release them. + * + * @param resource to be released + */ + + @Override + public void releaseResources(SamzaResource resource) { + log.info("Release resource invoked {} ", resource); + //ensure that updating state and removing the request are done atomically + synchronized (lock) { + Container container = allocatedResources.get(resource); + if (container == null) { + log.info("Resource {} already released. ", resource); + return; + } + amClient.releaseAssignedContainer(container.getId()); + allocatedResources.remove(resource); + } + } + + /** + * + * Requests the launch of a StreamProcessor with the specified ID on the resource + * @param resource , the SamzaResource on which to launch the StreamProcessor + * @param builder, the builder to build the resource launch command from + * + * TODO: Support non-builder methods to launch resources. Maybe, refactor into a ContainerLaunchStrategy interface + */ + + @Override + public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException { + String containerIDStr = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID()); + int containerID = Integer.parseInt(containerIDStr); + log.info("Received launch request for {} on hostname {}", containerID , resource.getHost()); + + synchronized (lock) { + Container container = allocatedResources.get(resource); + if (container == null) { + log.info("Resource {} already released. ", resource); + return; + } + + state.runningYarnContainers.put(containerID, new YarnContainer(container)); + yarnContainerRunner.runContainer(containerID, container, builder); + } + } + + /** + * Given a lookupContainerId from Yarn (for example: containerId_app_12345, this method returns the SamzaContainer ID + * in the range [0,N-1] that maps to it. + * @param lookupContainerId the Yarn container ID. + * @return the samza container ID. + */ + + //TODO: Get rid of the YarnContainer object and just use Container in state.runningYarnContainers hashmap. + //In that case, this scan will turn into a lookup. This change will require changes/testing in the UI files because + //those UI stub templates operate on the YarnContainer object. + + private int getIDForContainer(String lookupContainerId) { + int samzaContainerID = INVALID_YARN_CONTAINER_ID; + for(Map.Entry<Integer, YarnContainer> entry : state.runningYarnContainers.entrySet()) { + Integer key = entry.getKey(); + YarnContainer yarnContainer = entry.getValue(); + String yarnContainerId = yarnContainer.id().toString(); + if(yarnContainerId.equals(lookupContainerId)) { + return key; + } + } + return samzaContainerID; + } + + + /** + * + * Remove a previously submitted resource request. The previous container request may have + * been submitted. Even after the remove request, a Callback implementation must + * be prepared to receive an allocation for the previous request. This is merely a best effort cancellation. + * + * @param request the request to be cancelled + */ + @Override + public void cancelResourceRequest(SamzaResourceRequest request) { + log.info("Cancelling request {} ", request); + //ensure that removal and cancellation are done atomically. + synchronized (lock) { + AMRMClient.ContainerRequest containerRequest = requestsMap.get(request); + if (containerRequest == null) { + log.info("Cancellation of {} already done. ", containerRequest); + return; + } + requestsMap.remove(request); + amClient.removeContainerRequest(containerRequest); + } + } + + + /** + * Stops the YarnContainerManager and all its sub-components. + * Stop should NOT be called from multiple threads. + * TODO: fix this to make stop idempotent?. + */ + @Override + public void stop(SamzaApplicationState.SamzaAppStatus status) { + log.info("Stopping AM client " ); + lifecycle.onShutdown(status); + amClient.stop(); + log.info("Stopping the AM service " ); + service.onShutdown(); + } + + /** + * Callback invoked from Yarn when containers complete. This translates the yarn callbacks into Samza specific + * ones. + * + * @param statuses the YarnContainerStatus callbacks from Yarn. + */ + @Override + public void onContainersCompleted(List<ContainerStatus> statuses) { + List<SamzaResourceStatus> samzaResrcStatuses = new ArrayList<>(); + + for(ContainerStatus status: statuses) { + log.info("Container completed from RM " + status); + + SamzaResourceStatus samzaResrcStatus = new SamzaResourceStatus(status.getContainerId().toString(), status.getDiagnostics(), status.getExitStatus()); + samzaResrcStatuses.add(samzaResrcStatus); + + int completedContainerID = getIDForContainer(status.getContainerId().toString()); + log.info("Completed container had ID: {}", completedContainerID); + + //remove the container from the list of running containers, if failed with a non-zero exit code, add it to the list of + //failed containers. + if(completedContainerID != INVALID_YARN_CONTAINER_ID){ + if(state.runningYarnContainers.containsKey(completedContainerID)) { + log.info("Removing container ID {} from completed containers", completedContainerID); + state.runningYarnContainers.remove(completedContainerID); + + if(status.getExitStatus() != ContainerExitStatus.SUCCESS) + state.failedContainersStatus.put(status.getContainerId().toString(), status); + } + } + } + clusterManagerCallback.onResourcesCompleted(samzaResrcStatuses); + } + + /** + * Callback invoked from Yarn when containers are allocated. This translates the yarn callbacks into Samza + * specific ones. + * @param containers the list of {@link Container} returned by Yarn. + */ + @Override + public void onContainersAllocated(List<Container> containers) { + List<SamzaResource> resources = new ArrayList<SamzaResource>(); + for(Container container : containers) { + log.info("Container allocated from RM on " + container.getNodeId().getHost()); + final String id = container.getId().toString(); + String host = container.getNodeId().getHost(); + int memory = container.getResource().getMemory(); + int numCores = container.getResource().getVirtualCores(); + + SamzaResource resource = new SamzaResource(numCores, memory, host, id); + allocatedResources.put(resource, container); + resources.add(resource); + } + clusterManagerCallback.onResourcesAvailable(resources); + } + + //The below methods are specific to the Yarn AMRM Client. We currently don't handle scenarios where there are + //nodes being updated. We always return 0 when asked for progress by Yarn. + @Override + public void onShutdownRequest() { + //not implemented currently. + } + + @Override + public void onNodesUpdated(List<NodeReport> updatedNodes) { + //not implemented currently. + } + + @Override + public float getProgress() { + //not implemented currently. + return 0; + } + + /** + * Callback invoked when there is an error in the Yarn client. This delegates the + * callback handling to the {@link ClusterResourceManager.Callback} instance. + * + */ + @Override + public void onError(Throwable e) { + log.error("Exception in the Yarn callback {}", e); + clusterManagerCallback.onError(e); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java new file mode 100644 index 0000000..dacc52d --- /dev/null +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.yarn; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; +import org.apache.samza.clustermanager.SamzaContainerLaunchException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.YarnConfig; +import org.apache.samza.job.CommandBuilder; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +/** + * A Helper class to run container processes on Yarn. This encapsulates quite a bit of YarnContainer + * boiler plate. + */ +public class YarnContainerRunner { + private static final Logger log = LoggerFactory.getLogger(YarnContainerRunner.class); + + private final Config config; + private final YarnConfiguration yarnConfiguration; + + private final NMClient nmClient; + private final YarnConfig yarnConfig; + + /** + * Create a new Runner from a Config. + * @param config to instantiate the runner with + * @param yarnConfiguration the yarn config for the cluster to connect to. + */ + + public YarnContainerRunner(Config config, + YarnConfiguration yarnConfiguration) { + this.config = config; + this.yarnConfiguration = yarnConfiguration; + + this.nmClient = NMClient.createNMClient(); + nmClient.init(this.yarnConfiguration); + + this.yarnConfig = new YarnConfig(config); + } + + /** + * Runs a process as specified by the command builder on the container. + * @param samzaContainerId id of the samza Container to run (passed as a command line parameter to the process) + * @param container the samza container to run. + * @param cmdBuilder the command builder that encapsulates the command, and the context + * + * @throws SamzaContainerLaunchException when there's an exception in submitting the request to the RM. + * + */ + public void runContainer(int samzaContainerId, Container container, CommandBuilder cmdBuilder) throws SamzaContainerLaunchException { + String containerIdStr = ConverterUtils.toString(container.getId()); + log.info("Got available container ID ({}) for container: {}", samzaContainerId, container); + + // check if we have framework path specified. If yes - use it, if not use default ./__package/ + String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries + String cmdPath = "./__package/"; + + String fwkPath = JobConfig.getFwkPath(config); + if(fwkPath != null && (! fwkPath.isEmpty())) { + cmdPath = fwkPath; + jobLib = "export JOB_LIB_DIR=./__package/lib"; + } + log.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + cmdPath + ";jobLib=" + jobLib); + cmdBuilder.setCommandPath(cmdPath); + + + String command = cmdBuilder.buildCommand(); + log.info("Container ID {} using command {}", samzaContainerId, command); + + Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder); + printContainerEnvironmentVariables(samzaContainerId, env); + + log.info("Samza FWK path: " + command + "; env=" + env); + + Path path = new Path(yarnConfig.getPackagePath()); + log.info("Starting container ID {} using package path {}", samzaContainerId, path); + + startContainer( + path, + container, + env, + getFormattedCommand( + ApplicationConstants.LOG_DIR_EXPANSION_VAR, + jobLib, + command, + ApplicationConstants.STDOUT, + ApplicationConstants.STDERR) + ); + + + log.info("Claimed container ID {} for container {} on node {} (http://{}/node/containerlogs/{}).", + new Object[]{ + samzaContainerId, + containerIdStr, + container.getNodeId().getHost(), + container.getNodeHttpAddress(), + containerIdStr} + ); + + log.info("Started container ID {}", samzaContainerId); + } + + /** + * Runs a command as a process on the container. All binaries needed by the physical process are packaged in the URL + * specified by packagePath. + */ + private void startContainer(Path packagePath, + Container container, + Map<String, String> env, + final String cmd) throws SamzaContainerLaunchException { + log.info("starting container {} {} {} {}", + new Object[]{packagePath, container, env, cmd}); + + // set the local package so that the containers and app master are provisioned with it + LocalResource packageResource = Records.newRecord(LocalResource.class); + URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath); + FileStatus fileStatus; + try { + fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath); + } catch (IOException ioe) { + log.error("IO Exception when accessing the package status from the filesystem", ioe); + throw new SamzaContainerLaunchException("IO Exception when accessing the package status from the filesystem"); + } + + packageResource.setResource(packageUrl); + packageResource.setSize(fileStatus.getLen()); + packageResource.setTimestamp(fileStatus.getModificationTime()); + packageResource.setType(LocalResourceType.ARCHIVE); + packageResource.setVisibility(LocalResourceVisibility.APPLICATION); + + ByteBuffer allTokens; + // copy tokens (copied from dist shell example) + try { + Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + + // now remove the AM->RM token so that containers cannot access it + Iterator iter = credentials.getAllTokens().iterator(); + while (iter.hasNext()) { + TokenIdentifier token = ((Token) iter.next()).decodeIdentifier(); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + iter.remove(); + } + } + allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + + } catch (IOException ioe) { + log.error("IOException when writing credentials.", ioe); + throw new SamzaContainerLaunchException("IO Exception when writing credentials to output buffer"); + } + + ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class); + context.setEnvironment(env); + context.setTokens(allTokens.duplicate()); + context.setCommands(new ArrayList<String>() {{add(cmd);}}); + context.setLocalResources(Collections.singletonMap("__package", packageResource)); + + log.debug("setting package to {}", packageResource); + log.debug("setting context to {}", context); + + StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class); + startContainerRequest.setContainerLaunchContext(context); + try { + nmClient.startContainer(container, context); + } catch (YarnException ye) { + log.error("Received YarnException when starting container: " + container.getId(), ye); + throw new SamzaContainerLaunchException("Received YarnException when starting container: " + container.getId(), ye); + } catch (IOException ioe) { + log.error("Received IOException when starting container: " + container.getId(), ioe); + throw new SamzaContainerLaunchException("Received IOException when starting container: " + container.getId(), ioe); + } + } + + + /** + * @param samzaContainerId the Samza container Id for logging purposes. + * @param env the Map of environment variables to their respective values. + */ + private void printContainerEnvironmentVariables(int samzaContainerId, Map<String, String> env) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, String> entry : env.entrySet()) { + sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue())); + } + log.info("Container ID {} using environment variables: {}", samzaContainerId, sb.toString()); + } + + + /** + * Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters. + * + * @param cmdBuilder the command builder containing the environment variables. + * @return the map containing the escaped environment variables. + */ + private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmdBuilder) { + Map<String, String> env = new HashMap<String, String>(); + for (Map.Entry<String, String> entry : cmdBuilder.buildEnvironment().entrySet()) { + String escapedValue = Util.envVarEscape(entry.getValue()); + env.put(entry.getKey(), escapedValue); + } + return env; + } + + + private String getFormattedCommand(String logDirExpansionVar, + String jobLib, + String command, + String stdOut, + String stdErr) { + if (!jobLib.isEmpty()) { + jobLib = "&& " + jobLib; // add job's libraries exported to an env variable + } + + return String + .format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar, + jobLib, logDirExpansionVar, command, stdOut, stdErr); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java new file mode 100644 index 0000000..988a8e8 --- /dev/null +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn; + +import org.apache.samza.clustermanager.ClusterResourceManager; +import org.apache.samza.clustermanager.ResourceManagerFactory; +import org.apache.samza.clustermanager.SamzaApplicationState; +import org.apache.samza.config.Config; +import org.apache.samza.coordinator.JobModelManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A YarnContainerProcessManagerFactory returns an implementation of a {@link ClusterResourceManager} for Yarn. + */ +public class YarnResourceManagerFactory implements ResourceManagerFactory { + + private static Logger log = LoggerFactory.getLogger(YarnResourceManagerFactory.class); + + @Override + public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) { + log.info("Creating an instance of a cluster resource manager for Yarn. "); + JobModelManager jobModelManager = state.jobModelManager; + Config config = jobModelManager.jobModel().getConfig(); + YarnClusterResourceManager manager = new YarnClusterResourceManager(config, jobModelManager, callback, state); + return manager; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java index 70f1e4f..c47e8d1 100644 --- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java +++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java @@ -35,8 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.container.SamzaContainerMetrics; -import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.apache.samza.job.yarn.ClientHelper; import org.apache.samza.metrics.JmxMetricsAccessor; @@ -149,9 +148,9 @@ public class YarnJobValidationTool { } public void validateJmxMetrics() throws Exception { - JobCoordinator jobCoordinator = JobCoordinator.apply(config); + JobModelManager jobModelManager = JobModelManager.apply(config); validator.init(config); - Map<Integer, String> jmxUrls = jobCoordinator.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY); + Map<Integer, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY); for (Map.Entry<Integer, String> entry : jmxUrls.entrySet()) { Integer containerId = entry.getKey(); String jmxUrl = entry.getValue(); http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala index 80deb3b..7bd8131 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala @@ -36,7 +36,7 @@ import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.util.hadoop.HttpFileSystem import org.apache.samza.util.Logging import org.apache.samza.serializers.model.SamzaObjectMapper -import org.apache.samza.coordinator.JobCoordinator +import org.apache.samza.coordinator.JobModelManager import org.apache.samza.SamzaException /** @@ -71,7 +71,7 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler { val coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG), classOf[Config])) info("got coordinator system config: %s" format coordinatorSystemConfig) val registry = new MetricsRegistryMap - val jobCoordinator = JobCoordinator(coordinatorSystemConfig, registry) + val jobCoordinator = JobModelManager(coordinatorSystemConfig, registry) val config = jobCoordinator.jobModel.getConfig val yarnConfig = new YarnConfig(config) info("got config: %s" format coordinatorSystemConfig) http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala new file mode 100644 index 0000000..2ed9baf --- /dev/null +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn + +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync +import org.apache.samza.SamzaException +import org.apache.samza.clustermanager.SamzaApplicationState +import SamzaApplicationState.SamzaAppStatus +import org.apache.samza.util.Logging + +/** + * Responsible for managing the lifecycle of the Yarn application master. Mostly, + * this means registering and unregistering with the RM, and shutting down + * when the RM tells us to Reboot. + */ +//This class is used in the refactored code path as called by run-jc.sh +class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: YarnAppState, amClient: AMRMClientAsync[ContainerRequest]) extends Logging { + var validResourceRequest = true + var shutdownMessage: String = null + var webApp: SamzaYarnAppMasterService = null + def onInit() { + val host = state.nodeHost + val response = amClient.registerApplicationMaster(host, state.rpcUrl.getPort, "%s:%d" format (host, state.trackingUrl.getPort)) + + // validate that the YARN cluster can handle our container resource requirements + val maxCapability = response.getMaximumResourceCapability + val maxMem = maxCapability.getMemory + val maxCpu = maxCapability.getVirtualCores + info("Got AM register response. The YARN RM supports container requests with max-mem: %s, max-cpu: %s" format (maxMem, maxCpu)) + + if (containerMem > maxMem || containerCpu > maxCpu) { + shutdownMessage = "The YARN cluster is unable to run your job due to unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." format (containerMem, containerCpu) + error(shutdownMessage) + validResourceRequest = false + state.samzaAppState.status = SamzaAppStatus.FAILED; + state.samzaAppState.jobHealthy.set(false) + } + } + + def onReboot() { + throw new SamzaException("Received a reboot signal from the RM, so throwing an exception to reboot the AM.") + } + + def onShutdown(samzaAppStatus: SamzaAppStatus) { + val yarnStatus: FinalApplicationStatus = getStatus(samzaAppStatus) + info("Shutting down SamzaAppStatus: " + samzaAppStatus + " yarn status: " + yarnStatus) + //The value of state.status is set to either SUCCEEDED or FAILED for errors we catch and handle - like container failures + //All other AM failures (errors in callbacks/connection failures after retries/token expirations) should not unregister the AM, + //allowing the RM to restart it (potentially on a different host) + if(samzaAppStatus != SamzaAppStatus.UNDEFINED) { + info("Unregistering AM from the RM.") + amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null) + info("Unregister complete.") + } + else { + info("Not unregistering AM from the RM. This will enable RM retries") + } + } + + def getStatus(samzaAppStatus: SamzaAppStatus): FinalApplicationStatus = { + if (samzaAppStatus == SamzaAppStatus.FAILED) + return FinalApplicationStatus.FAILED + if(samzaAppStatus == SamzaAppStatus.SUCCEEDED) + return FinalApplicationStatus.SUCCEEDED + + return FinalApplicationStatus.UNDEFINED + } + + + def shouldShutdown = !validResourceRequest +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala new file mode 100644 index 0000000..f62bec1 --- /dev/null +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn + +import org.apache.samza.config.Config +import org.apache.samza.coordinator.server.HttpServer +import org.apache.samza.coordinator.stream.CoordinatorStreamWriter +import org.apache.samza.coordinator.stream.messages.SetConfig +import org.apache.samza.metrics.ReadableMetricsRegistry +import org.apache.samza.util.Logging + +/** + * Samza's application master runs a very basic HTTP/JSON service to allow + * dashboards to check on the status of a job. SamzaAppMasterService starts + * up the web service when initialized. + */ +//This class is used in the refactored code path as called by run-jc.sh + +class SamzaYarnAppMasterService(config: Config, state: YarnAppState, registry: ReadableMetricsRegistry) extends Logging { + var rpcApp: HttpServer = null + var webApp: HttpServer = null + val SERVER_URL_OPT: String = "samza.autoscaling.server.url" + + def onInit() { + // try starting the samza AM dashboard at a random rpc and tracking port + info("Starting webapp at a random rpc and tracking port") + + rpcApp = new HttpServer(resourceBasePath = "scalate") + //TODO: Since the state has changed into Samza specific and Yarn specific states, this UI has to be refactored too. + //rpcApp.addServlet("/*", refactor ApplicationMasterRestServlet(config, state, registry)) + rpcApp.start + + webApp = new HttpServer(resourceBasePath = "scalate") + //webApp.addServlet("/*", refactor ApplicationMasterWebServlet(config, state)) + webApp.start + + state.jobModelManager.start + state.rpcUrl = rpcApp.getUrl + state.trackingUrl = webApp.getUrl + state.coordinatorUrl = state.jobModelManager.server.getUrl + + //write server url to coordinator stream + val coordinatorStreamWriter: CoordinatorStreamWriter = new CoordinatorStreamWriter(config) + coordinatorStreamWriter.start() + coordinatorStreamWriter.sendMessage(SetConfig.TYPE, SERVER_URL_OPT, state.coordinatorUrl.toString) + coordinatorStreamWriter.stop() + debug("Sent server url message with value: %s " format state.coordinatorUrl.toString) + + info("Webapp is started at (rpc %s, tracking %s, coordinator %s)" format(state.rpcUrl, state.trackingUrl, state.coordinatorUrl)) + } + + def onShutdown() { + if (rpcApp != null) { + rpcApp.stop + } + + if (webApp != null) { + webApp.stop + } + + state.jobModelManager.stop + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java index 5badd29..0bbd48d 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.samza.config.Config; import org.apache.samza.config.YarnConfig; import org.apache.samza.container.TaskName; -import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.coordinator.server.HttpServer; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; @@ -77,14 +77,14 @@ public abstract class TestContainerAllocatorCommon { protected abstract Config getConfig(); protected abstract MockContainerRequestState createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient); - private JobCoordinator getCoordinator(int containerCount) { + private JobModelManager getCoordinator(int containerCount) { Map<Integer, ContainerModel> containers = new java.util.HashMap<>(); for (int i = 0; i < containerCount; i++) { ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>()); containers.put(i, container); } JobModel jobModel = new JobModel(getConfig(), containers); - return new JobCoordinator(jobModel, server, null); + return new JobModelManager(jobModel, server, null); } http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java index faa697d..d747b81 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java @@ -33,7 +33,7 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.YarnConfig; import org.apache.samza.container.LocalityManager; import org.apache.samza.container.TaskName; -import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.coordinator.server.HttpServer; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.apache.samza.job.model.ContainerModel; @@ -98,7 +98,7 @@ public class TestSamzaTaskManager { private SamzaAppState state = null; private HttpServer server = null; - private JobCoordinator getCoordinator(int containerCount) { + private JobModelManager getCoordinator(int containerCount) { Map<Integer, ContainerModel> containers = new java.util.HashMap<>(); for (int i = 0; i < containerCount; i++) { ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>()); @@ -114,8 +114,8 @@ public class TestSamzaTaskManager { when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap); JobModel jobModel = new JobModel(getConfig(), containers, mockLocalityManager); - JobCoordinator.jobModelRef().getAndSet(jobModel); - return new JobCoordinator(jobModel, server, null); + JobModelManager.jobModelRef().getAndSet(jobModel); + return new JobModelManager(jobModel, server, null); } @Before http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala index 30cf34f..750f467 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala @@ -34,10 +34,10 @@ import org.junit.Assert._ import org.junit.Test import org.mockito.Mockito import java.net.URL -import org.apache.samza.coordinator.JobCoordinator +import org.apache.samza.coordinator.JobModelManager class TestSamzaAppMasterLifecycle { - val coordinator = new JobCoordinator(null, null) + val coordinator = new JobModelManager(null, null) val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, Mockito.mock(classOf[CallbackHandler])) { var host = "" var port = 0 http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala index 7f5d9f4..fc0091f 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala @@ -34,7 +34,7 @@ import scala.collection.JavaConversions._ import org.apache.samza.config.Config import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.container.TaskName -import org.apache.samza.coordinator.JobCoordinator +import org.apache.samza.coordinator.JobModelManager import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory class TestSamzaAppMasterService { @@ -42,7 +42,7 @@ class TestSamzaAppMasterService { @Test def testAppMasterDashboardShouldStart { val config = getDummyConfig - val state = new SamzaAppState(JobCoordinator(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2) + val state = new SamzaAppState(JobModelManager(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2) val service = new SamzaAppMasterService(config, state, null, null) val taskName = new TaskName("test") @@ -73,7 +73,7 @@ class TestSamzaAppMasterService { def testAppMasterDashboardWebServiceShouldStart { // Create some dummy config val config = getDummyConfig - val state = new SamzaAppState(JobCoordinator(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2) + val state = new SamzaAppState(JobModelManager(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2) val service = new SamzaAppMasterService(config, state, null, null) // start the dashboard
