http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 6ee741a..a8ba445 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -117,7 +117,7 @@ import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.MockClock; import org.apache.tez.dag.app.TaskAttemptEventInfo; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.RootInputInitializerManager; @@ -153,7 +153,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source; import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo; import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException.VMExceptionLocation; -import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; +import org.apache.tez.dag.app.rm.TaskSchedulerManager; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; import org.apache.tez.dag.history.DAGHistoryEvent; @@ -161,7 +161,6 @@ import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; -import org.apache.tez.dag.records.TaskAttemptIdentifierImpl; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -228,7 +227,7 @@ public class TestVertexImpl { private Map<String, VertexImpl> vertices; private Map<TezVertexID, VertexImpl> vertexIdMap; private DrainDispatcher dispatcher; - private TaskAttemptListener taskAttemptListener; + private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface; private Clock clock = new SystemClock(); private TaskHeartbeatHandler thh; private AppContext appContext; @@ -2077,16 +2076,16 @@ public class TestVertexImpl { if (useCustomInitializer) { if (customInitializer == null) { v = new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), conf, - dispatcher.getEventHandler(), taskAttemptListener, + dispatcher.getEventHandler(), taskCommunicatorManagerInterface, clock, thh, appContext, locationHint, dispatcher, updateTracker); } else { v = new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), conf, - dispatcher.getEventHandler(), taskAttemptListener, + dispatcher.getEventHandler(), taskCommunicatorManagerInterface, clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker); } } else { v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf, - dispatcher.getEventHandler(), taskAttemptListener, + dispatcher.getEventHandler(), taskCommunicatorManagerInterface, clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption, updateTracker); } @@ -2162,7 +2161,7 @@ public class TestVertexImpl { appContext = mock(AppContext.class); thh = mock(TaskHeartbeatHandler.class); historyEventHandler = mock(HistoryEventHandler.class); - TaskSchedulerEventHandler taskScheduler = mock(TaskSchedulerEventHandler.class); + TaskSchedulerManager taskScheduler = mock(TaskSchedulerManager.class); UserGroupInformation ugi; try { ugi = UserGroupInformation.getCurrentUser(); @@ -3227,7 +3226,7 @@ public class TestVertexImpl { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); containers.addContainerIfNew(container, 0, 0, 0); doReturn(containers).when(appContext).getAllContainers(); @@ -3262,7 +3261,7 @@ public class TestVertexImpl { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); containers.addContainerIfNew(container, 0, 0, 0); doReturn(containers).when(appContext).getAllContainers(); @@ -3298,7 +3297,7 @@ public class TestVertexImpl { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); containers.addContainerIfNew(container, 0, 0, 0); doReturn(containers).when(appContext).getAllContainers(); @@ -5179,7 +5178,7 @@ public class TestVertexImpl { vId = TezVertexID.getInstance(invalidDagId, 1); VertexPlan vPlan = invalidDagPlan.getVertex(0); VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf, - dispatcher.getEventHandler(), taskAttemptListener, + dispatcher.getEventHandler(), taskCommunicatorManagerInterface, clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption, updateTracker); v.setInputVertices(new HashMap()); @@ -5208,7 +5207,7 @@ public class TestVertexImpl { VertexPlan vertexPlan, String vertexName, Configuration conf, EventHandler eventHandler, - TaskAttemptListener taskAttemptListener, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, VertexLocationHint vertexLocationHint, @@ -5216,7 +5215,7 @@ public class TestVertexImpl { InputInitializer presetInitializer, StateChangeNotifier updateTracker) { super(vertexId, vertexPlan, vertexName, conf, eventHandler, - taskAttemptListener, clock, thh, true, + taskCommunicatorManagerInterface, clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption, updateTracker); this.presetInitializer = presetInitializer; @@ -5248,14 +5247,14 @@ public class TestVertexImpl { VertexPlan vertexPlan, String vertexName, Configuration conf, EventHandler eventHandler, - TaskAttemptListener taskAttemptListener, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, VertexLocationHint vertexLocationHint, DrainDispatcher dispatcher, StateChangeNotifier updateTracker) { super(vertexId, vertexPlan, vertexName, conf, eventHandler, - taskAttemptListener, clock, thh, true, + taskCommunicatorManagerInterface, clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption, updateTracker); this.dispatcher = dispatcher;
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java index 0e34f68..8bd288a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java @@ -54,7 +54,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.DAGAppMaster; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.StateChangeNotifier; @@ -517,7 +517,7 @@ public class TestVertexImpl2 { vertex = new VertexImpl(TezVertexID.fromString("vertex_1418197758681_0001_1_00"), vertexPlan, - "testvertex", conf, mock(EventHandler.class), mock(TaskAttemptListener.class), + "testvertex", conf, mock(EventHandler.class), mock(TaskCommunicatorManagerInterface.class), mock(Clock.class), mock(TaskHeartbeatHandler.class), false, mockAppContext, VertexLocationHint.create(new LinkedList<TaskLocationHint>()), null, new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class)); http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java index 0f532fb..e389d64 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java @@ -55,7 +55,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ClusterInfo; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.Task; @@ -434,7 +434,7 @@ public class TestVertexRecovery { DAGPlan dagPlan = createDAGPlan(); dag = new DAGImpl(dagId, new Configuration(), dagPlan, - dispatcher.getEventHandler(), mock(TaskAttemptListener.class), + dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class), new Credentials(), new SystemClock(), user, mock(TaskHeartbeatHandler.class), mockAppContext); when(mockAppContext.getCurrentDAG()).thenReturn(dag); @@ -544,7 +544,7 @@ public class TestVertexRecovery { DAGPlan dagPlan = createDAGPlanSingleVertex(); dag = new DAGImpl(dagId, new Configuration(), dagPlan, - dispatcher.getEventHandler(), mock(TaskAttemptListener.class), + dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class), new Credentials(), new SystemClock(), user, mock(TaskHeartbeatHandler.class), mockAppContext); when(mockAppContext.getCurrentDAG()).thenReturn(dag); @@ -924,7 +924,7 @@ public class TestVertexRecovery { DAGPlan dagPlan = createDAGPlanMR(); dag = new DAGImpl(dagId, new Configuration(), dagPlan, - dispatcher.getEventHandler(), mock(TaskAttemptListener.class), + dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class), new Credentials(), new SystemClock(), user, mock(TaskHeartbeatHandler.class), mockAppContext); when(mockAppContext.getCurrentDAG()).thenReturn(dag); @@ -965,7 +965,7 @@ public class TestVertexRecovery { DAGPlan dagPlan = createDAGPlan(); dag = new DAGImpl(dagId, new Configuration(), dagPlan, - dispatcher.getEventHandler(), mock(TaskAttemptListener.class), + dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class), new Credentials(), new SystemClock(), user, mock(TaskHeartbeatHandler.class), mockAppContext); when(mockAppContext.getCurrentDAG()).thenReturn(dag); http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java new file mode 100644 index 0000000..4b931d4 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java @@ -0,0 +1,359 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; +import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent; +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class TestContainerLauncherManager { + + @Before + @After + public void reset() { + ContainerLaucherRouterForMultipleLauncherTest.reset(); + } + + @Test(timeout = 5000) + public void testNoLaunchersSpecified() throws IOException { + + AppContext appContext = mock(AppContext.class); + TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class); + + try { + + new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null, null, + false); + fail("Expecting a failure without any launchers being specified"); + } catch (IllegalArgumentException e) { + + } + } + + @Test(timeout = 5000) + public void testCustomLauncherSpecified() throws IOException { + Configuration conf = new Configuration(false); + + AppContext appContext = mock(AppContext.class); + TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class); + + String customLauncherName = "customLauncher"; + List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>(); + ByteBuffer bb = ByteBuffer.allocate(4); + bb.putInt(0, 3); + UserPayload customPayload = UserPayload.create(bb); + launcherDescriptors.add( + new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName()) + .setUserPayload(customPayload)); + + ContainerLaucherRouterForMultipleLauncherTest clr = + new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null, + launcherDescriptors, + true); + try { + clr.init(conf); + clr.start(); + + assertEquals(1, clr.getNumContainerLaunchers()); + assertFalse(clr.getYarnContainerLauncherCreated()); + assertFalse(clr.getUberContainerLauncherCreated()); + assertEquals(customLauncherName, clr.getContainerLauncherName(0)); + assertEquals(bb, clr.getContainerLauncherContext(0).getInitialUserPayload().getPayload()); + } finally { + clr.stop(); + } + } + + @Test(timeout = 5000) + public void testMultipleContainerLaunchers() throws IOException { + Configuration conf = new Configuration(false); + conf.set("testkey", "testvalue"); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); + + AppContext appContext = mock(AppContext.class); + TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class); + + String customLauncherName = "customLauncher"; + List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>(); + ByteBuffer bb = ByteBuffer.allocate(4); + bb.putInt(0, 3); + UserPayload customPayload = UserPayload.create(bb); + launcherDescriptors.add( + new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName()) + .setUserPayload(customPayload)); + launcherDescriptors + .add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) + .setUserPayload(userPayload)); + + ContainerLaucherRouterForMultipleLauncherTest clr = + new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null, + launcherDescriptors, + true); + try { + clr.init(conf); + clr.start(); + + assertEquals(2, clr.getNumContainerLaunchers()); + assertTrue(clr.getYarnContainerLauncherCreated()); + assertFalse(clr.getUberContainerLauncherCreated()); + assertEquals(customLauncherName, clr.getContainerLauncherName(0)); + assertEquals(bb, clr.getContainerLauncherContext(0).getInitialUserPayload().getPayload()); + + assertEquals(TezConstants.getTezYarnServicePluginName(), clr.getContainerLauncherName(1)); + Configuration confParsed = TezUtils + .createConfFromUserPayload(clr.getContainerLauncherContext(1).getInitialUserPayload()); + assertEquals("testvalue", confParsed.get("testkey")); + } finally { + clr.stop(); + } + } + + @Test(timeout = 5000) + public void testEventRouting() throws Exception { + Configuration conf = new Configuration(false); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); + + AppContext appContext = mock(AppContext.class); + TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class); + + String customLauncherName = "customLauncher"; + List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>(); + ByteBuffer bb = ByteBuffer.allocate(4); + bb.putInt(0, 3); + UserPayload customPayload = UserPayload.create(bb); + launcherDescriptors.add( + new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName()) + .setUserPayload(customPayload)); + launcherDescriptors + .add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) + .setUserPayload(userPayload)); + + ContainerLaucherRouterForMultipleLauncherTest clr = + new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null, + launcherDescriptors, + true); + try { + clr.init(conf); + clr.start(); + + assertEquals(2, clr.getNumContainerLaunchers()); + assertTrue(clr.getYarnContainerLauncherCreated()); + assertFalse(clr.getUberContainerLauncherCreated()); + assertEquals(customLauncherName, clr.getContainerLauncherName(0)); + assertEquals(TezConstants.getTezYarnServicePluginName(), clr.getContainerLauncherName(1)); + + verify(clr.getTestContainerLauncher(0)).initialize(); + verify(clr.getTestContainerLauncher(0)).start(); + verify(clr.getTestContainerLauncher(1)).initialize(); + verify(clr.getTestContainerLauncher(1)).start(); + + ContainerLaunchContext clc1 = mock(ContainerLaunchContext.class); + Container container1 = mock(Container.class); + + ContainerLaunchContext clc2 = mock(ContainerLaunchContext.class); + Container container2 = mock(Container.class); + + ContainerLauncherLaunchRequestEvent launchRequestEvent1 = + new ContainerLauncherLaunchRequestEvent(clc1, container1, 0, 0, 0); + ContainerLauncherLaunchRequestEvent launchRequestEvent2 = + new ContainerLauncherLaunchRequestEvent(clc2, container2, 1, 0, 0); + + clr.handle(launchRequestEvent1); + + + ArgumentCaptor<ContainerLaunchRequest> captor = + ArgumentCaptor.forClass(ContainerLaunchRequest.class); + verify(clr.getTestContainerLauncher(0)).launchContainer(captor.capture()); + assertEquals(1, captor.getAllValues().size()); + ContainerLaunchRequest launchRequest1 = captor.getValue(); + assertEquals(clc1, launchRequest1.getContainerLaunchContext()); + + clr.handle(launchRequestEvent2); + captor = ArgumentCaptor.forClass(ContainerLaunchRequest.class); + verify(clr.getTestContainerLauncher(1)).launchContainer(captor.capture()); + assertEquals(1, captor.getAllValues().size()); + ContainerLaunchRequest launchRequest2 = captor.getValue(); + assertEquals(clc2, launchRequest2.getContainerLaunchContext()); + + } finally { + clr.stop(); + verify(clr.getTestContainerLauncher(0)).shutdown(); + verify(clr.getTestContainerLauncher(1)).shutdown(); + } + } + + private static class ContainerLaucherRouterForMultipleLauncherTest + extends ContainerLauncherManager { + + // All variables setup as static since methods being overridden are invoked by the ContainerLauncherRouter ctor, + // and regular variables will not be initialized at this point. + private static final AtomicInteger numContainerLaunchers = new AtomicInteger(0); + private static final Set<Integer> containerLauncherIndices = new HashSet<>(); + private static final ContainerLauncher yarnContainerLauncher = mock(ContainerLauncher.class); + private static final ContainerLauncher uberContainerlauncher = mock(ContainerLauncher.class); + private static final AtomicBoolean yarnContainerLauncherCreated = new AtomicBoolean(false); + private static final AtomicBoolean uberContainerLauncherCreated = new AtomicBoolean(false); + + private static final List<ContainerLauncherContext> containerLauncherContexts = + new LinkedList<>(); + private static final List<String> containerLauncherNames = new LinkedList<>(); + private static final List<ContainerLauncher> testContainerLaunchers = new LinkedList<>(); + + + public static void reset() { + numContainerLaunchers.set(0); + containerLauncherIndices.clear(); + yarnContainerLauncherCreated.set(false); + uberContainerLauncherCreated.set(false); + containerLauncherContexts.clear(); + containerLauncherNames.clear(); + testContainerLaunchers.clear(); + } + + public ContainerLaucherRouterForMultipleLauncherTest(AppContext context, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, + String workingDirectory, + List<NamedEntityDescriptor> containerLauncherDescriptors, + boolean isPureLocalMode) throws + UnknownHostException { + super(context, taskCommunicatorManagerInterface, workingDirectory, + containerLauncherDescriptors, isPureLocalMode); + } + + @Override + ContainerLauncher createContainerLauncher(NamedEntityDescriptor containerLauncherDescriptor, + AppContext context, + ContainerLauncherContext containerLauncherContext, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, + String workingDirectory, + int containerLauncherIndex, + boolean isPureLocalMode) { + numContainerLaunchers.incrementAndGet(); + boolean added = containerLauncherIndices.add(containerLauncherIndex); + assertTrue("Cannot add multiple launchers with the same index", added); + containerLauncherNames.add(containerLauncherDescriptor.getEntityName()); + containerLauncherContexts.add(containerLauncherContext); + return super + .createContainerLauncher(containerLauncherDescriptor, context, containerLauncherContext, + taskCommunicatorManagerInterface, workingDirectory, containerLauncherIndex, isPureLocalMode); + } + + @Override + ContainerLauncher createYarnContainerLauncher( + ContainerLauncherContext containerLauncherContext) { + yarnContainerLauncherCreated.set(true); + testContainerLaunchers.add(yarnContainerLauncher); + return yarnContainerLauncher; + } + + @Override + ContainerLauncher createUberContainerLauncher(ContainerLauncherContext containerLauncherContext, + AppContext context, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, + String workingDirectory, + boolean isPureLocalMode) { + uberContainerLauncherCreated.set(true); + testContainerLaunchers.add(uberContainerlauncher); + return uberContainerlauncher; + } + + @Override + ContainerLauncher createCustomContainerLauncher( + ContainerLauncherContext containerLauncherContext, + NamedEntityDescriptor containerLauncherDescriptor) { + ContainerLauncher spyLauncher = spy(super.createCustomContainerLauncher( + containerLauncherContext, containerLauncherDescriptor)); + testContainerLaunchers.add(spyLauncher); + return spyLauncher; + } + + public int getNumContainerLaunchers() { + return numContainerLaunchers.get(); + } + + public boolean getYarnContainerLauncherCreated() { + return yarnContainerLauncherCreated.get(); + } + + public boolean getUberContainerLauncherCreated() { + return uberContainerLauncherCreated.get(); + } + + public String getContainerLauncherName(int containerLauncherIndex) { + return containerLauncherNames.get(containerLauncherIndex); + } + + public ContainerLauncher getTestContainerLauncher(int containerLauncherIndex) { + return testContainerLaunchers.get(containerLauncherIndex); + } + + public ContainerLauncherContext getContainerLauncherContext(int containerLauncherIndex) { + return containerLauncherContexts.get(containerLauncherIndex); + } + } + + private static class FakeContainerLauncher extends ContainerLauncher { + + public FakeContainerLauncher( + ContainerLauncherContext containerLauncherContext) { + super(containerLauncherContext); + } + + @Override + public void launchContainer(ContainerLaunchRequest launchRequest) { + + } + + @Override + public void stopContainer(ContainerStopRequest stopRequest) { + + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java deleted file mode 100644 index d0caf8c..0000000 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.launcher; - - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; - -import java.io.IOException; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.NamedEntityDescriptor; -import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.TaskAttemptListener; -import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; -import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; -import org.apache.tez.serviceplugins.api.ContainerLauncher; -import org.apache.tez.serviceplugins.api.ContainerLauncherContext; -import org.apache.tez.serviceplugins.api.ContainerStopRequest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -public class TestContainerLauncherRouter { - - @Before - @After - public void reset() { - ContainerLaucherRouterForMultipleLauncherTest.reset(); - } - - @Test(timeout = 5000) - public void testNoLaunchersSpecified() throws IOException { - - AppContext appContext = mock(AppContext.class); - TaskAttemptListener tal = mock(TaskAttemptListener.class); - - try { - - new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null, null, - false); - fail("Expecting a failure without any launchers being specified"); - } catch (IllegalArgumentException e) { - - } - } - - @Test(timeout = 5000) - public void testCustomLauncherSpecified() throws IOException { - Configuration conf = new Configuration(false); - - AppContext appContext = mock(AppContext.class); - TaskAttemptListener tal = mock(TaskAttemptListener.class); - - String customLauncherName = "customLauncher"; - List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>(); - ByteBuffer bb = ByteBuffer.allocate(4); - bb.putInt(0, 3); - UserPayload customPayload = UserPayload.create(bb); - launcherDescriptors.add( - new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName()) - .setUserPayload(customPayload)); - - ContainerLaucherRouterForMultipleLauncherTest clr = - new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null, - launcherDescriptors, - true); - try { - clr.init(conf); - clr.start(); - - assertEquals(1, clr.getNumContainerLaunchers()); - assertFalse(clr.getYarnContainerLauncherCreated()); - assertFalse(clr.getUberContainerLauncherCreated()); - assertEquals(customLauncherName, clr.getContainerLauncherName(0)); - assertEquals(bb, clr.getContainerLauncherContext(0).getInitialUserPayload().getPayload()); - } finally { - clr.stop(); - } - } - - @Test(timeout = 5000) - public void testMultipleContainerLaunchers() throws IOException { - Configuration conf = new Configuration(false); - conf.set("testkey", "testvalue"); - UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); - - AppContext appContext = mock(AppContext.class); - TaskAttemptListener tal = mock(TaskAttemptListener.class); - - String customLauncherName = "customLauncher"; - List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>(); - ByteBuffer bb = ByteBuffer.allocate(4); - bb.putInt(0, 3); - UserPayload customPayload = UserPayload.create(bb); - launcherDescriptors.add( - new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName()) - .setUserPayload(customPayload)); - launcherDescriptors - .add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) - .setUserPayload(userPayload)); - - ContainerLaucherRouterForMultipleLauncherTest clr = - new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null, - launcherDescriptors, - true); - try { - clr.init(conf); - clr.start(); - - assertEquals(2, clr.getNumContainerLaunchers()); - assertTrue(clr.getYarnContainerLauncherCreated()); - assertFalse(clr.getUberContainerLauncherCreated()); - assertEquals(customLauncherName, clr.getContainerLauncherName(0)); - assertEquals(bb, clr.getContainerLauncherContext(0).getInitialUserPayload().getPayload()); - - assertEquals(TezConstants.getTezYarnServicePluginName(), clr.getContainerLauncherName(1)); - Configuration confParsed = TezUtils - .createConfFromUserPayload(clr.getContainerLauncherContext(1).getInitialUserPayload()); - assertEquals("testvalue", confParsed.get("testkey")); - } finally { - clr.stop(); - } - } - - @Test(timeout = 5000) - public void testEventRouting() throws Exception { - Configuration conf = new Configuration(false); - UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); - - AppContext appContext = mock(AppContext.class); - TaskAttemptListener tal = mock(TaskAttemptListener.class); - - String customLauncherName = "customLauncher"; - List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>(); - ByteBuffer bb = ByteBuffer.allocate(4); - bb.putInt(0, 3); - UserPayload customPayload = UserPayload.create(bb); - launcherDescriptors.add( - new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName()) - .setUserPayload(customPayload)); - launcherDescriptors - .add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) - .setUserPayload(userPayload)); - - ContainerLaucherRouterForMultipleLauncherTest clr = - new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null, - launcherDescriptors, - true); - try { - clr.init(conf); - clr.start(); - - assertEquals(2, clr.getNumContainerLaunchers()); - assertTrue(clr.getYarnContainerLauncherCreated()); - assertFalse(clr.getUberContainerLauncherCreated()); - assertEquals(customLauncherName, clr.getContainerLauncherName(0)); - assertEquals(TezConstants.getTezYarnServicePluginName(), clr.getContainerLauncherName(1)); - - verify(clr.getTestContainerLauncher(0)).initialize(); - verify(clr.getTestContainerLauncher(0)).start(); - verify(clr.getTestContainerLauncher(1)).initialize(); - verify(clr.getTestContainerLauncher(1)).start(); - - ContainerLaunchContext clc1 = mock(ContainerLaunchContext.class); - Container container1 = mock(Container.class); - - ContainerLaunchContext clc2 = mock(ContainerLaunchContext.class); - Container container2 = mock(Container.class); - - NMCommunicatorLaunchRequestEvent launchRequestEvent1 = - new NMCommunicatorLaunchRequestEvent(clc1, container1, 0, 0, 0); - NMCommunicatorLaunchRequestEvent launchRequestEvent2 = - new NMCommunicatorLaunchRequestEvent(clc2, container2, 1, 0, 0); - - clr.handle(launchRequestEvent1); - - - ArgumentCaptor<ContainerLaunchRequest> captor = - ArgumentCaptor.forClass(ContainerLaunchRequest.class); - verify(clr.getTestContainerLauncher(0)).launchContainer(captor.capture()); - assertEquals(1, captor.getAllValues().size()); - ContainerLaunchRequest launchRequest1 = captor.getValue(); - assertEquals(clc1, launchRequest1.getContainerLaunchContext()); - - clr.handle(launchRequestEvent2); - captor = ArgumentCaptor.forClass(ContainerLaunchRequest.class); - verify(clr.getTestContainerLauncher(1)).launchContainer(captor.capture()); - assertEquals(1, captor.getAllValues().size()); - ContainerLaunchRequest launchRequest2 = captor.getValue(); - assertEquals(clc2, launchRequest2.getContainerLaunchContext()); - - } finally { - clr.stop(); - verify(clr.getTestContainerLauncher(0)).shutdown(); - verify(clr.getTestContainerLauncher(1)).shutdown(); - } - } - - private static class ContainerLaucherRouterForMultipleLauncherTest - extends ContainerLauncherRouter { - - // All variables setup as static since methods being overridden are invoked by the ContainerLauncherRouter ctor, - // and regular variables will not be initialized at this point. - private static final AtomicInteger numContainerLaunchers = new AtomicInteger(0); - private static final Set<Integer> containerLauncherIndices = new HashSet<>(); - private static final ContainerLauncher yarnContainerLauncher = mock(ContainerLauncher.class); - private static final ContainerLauncher uberContainerlauncher = mock(ContainerLauncher.class); - private static final AtomicBoolean yarnContainerLauncherCreated = new AtomicBoolean(false); - private static final AtomicBoolean uberContainerLauncherCreated = new AtomicBoolean(false); - - private static final List<ContainerLauncherContext> containerLauncherContexts = - new LinkedList<>(); - private static final List<String> containerLauncherNames = new LinkedList<>(); - private static final List<ContainerLauncher> testContainerLaunchers = new LinkedList<>(); - - - public static void reset() { - numContainerLaunchers.set(0); - containerLauncherIndices.clear(); - yarnContainerLauncherCreated.set(false); - uberContainerLauncherCreated.set(false); - containerLauncherContexts.clear(); - containerLauncherNames.clear(); - testContainerLaunchers.clear(); - } - - public ContainerLaucherRouterForMultipleLauncherTest(AppContext context, - TaskAttemptListener taskAttemptListener, - String workingDirectory, - List<NamedEntityDescriptor> containerLauncherDescriptors, - boolean isPureLocalMode) throws - UnknownHostException { - super(context, taskAttemptListener, workingDirectory, - containerLauncherDescriptors, isPureLocalMode); - } - - @Override - ContainerLauncher createContainerLauncher(NamedEntityDescriptor containerLauncherDescriptor, - AppContext context, - ContainerLauncherContext containerLauncherContext, - TaskAttemptListener taskAttemptListener, - String workingDirectory, - int containerLauncherIndex, - boolean isPureLocalMode) { - numContainerLaunchers.incrementAndGet(); - boolean added = containerLauncherIndices.add(containerLauncherIndex); - assertTrue("Cannot add multiple launchers with the same index", added); - containerLauncherNames.add(containerLauncherDescriptor.getEntityName()); - containerLauncherContexts.add(containerLauncherContext); - return super - .createContainerLauncher(containerLauncherDescriptor, context, containerLauncherContext, - taskAttemptListener, workingDirectory, containerLauncherIndex, isPureLocalMode); - } - - @Override - ContainerLauncher createYarnContainerLauncher( - ContainerLauncherContext containerLauncherContext) { - yarnContainerLauncherCreated.set(true); - testContainerLaunchers.add(yarnContainerLauncher); - return yarnContainerLauncher; - } - - @Override - ContainerLauncher createUberContainerLauncher(ContainerLauncherContext containerLauncherContext, - AppContext context, - TaskAttemptListener taskAttemptListener, - String workingDirectory, - boolean isPureLocalMode) { - uberContainerLauncherCreated.set(true); - testContainerLaunchers.add(uberContainerlauncher); - return uberContainerlauncher; - } - - @Override - ContainerLauncher createCustomContainerLauncher( - ContainerLauncherContext containerLauncherContext, - NamedEntityDescriptor containerLauncherDescriptor) { - ContainerLauncher spyLauncher = spy(super.createCustomContainerLauncher( - containerLauncherContext, containerLauncherDescriptor)); - testContainerLaunchers.add(spyLauncher); - return spyLauncher; - } - - public int getNumContainerLaunchers() { - return numContainerLaunchers.get(); - } - - public boolean getYarnContainerLauncherCreated() { - return yarnContainerLauncherCreated.get(); - } - - public boolean getUberContainerLauncherCreated() { - return uberContainerLauncherCreated.get(); - } - - public String getContainerLauncherName(int containerLauncherIndex) { - return containerLauncherNames.get(containerLauncherIndex); - } - - public ContainerLauncher getTestContainerLauncher(int containerLauncherIndex) { - return testContainerLaunchers.get(containerLauncherIndex); - } - - public ContainerLauncherContext getContainerLauncherContext(int containerLauncherIndex) { - return containerLauncherContexts.get(containerLauncherIndex); - } - } - - private static class FakeContainerLauncher extends ContainerLauncher { - - public FakeContainerLauncher( - ContainerLauncherContext containerLauncherContext) { - super(containerLauncherContext); - } - - @Override - public void launchContainer(ContainerLaunchRequest launchRequest) { - - } - - @Override - public void stopContainer(ContainerStopRequest stopRequest) { - - } - } - -}
