Merge branch 'TEZ-3334' into TEZ-3334-MERGE1
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/251ca1c3 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/251ca1c3 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/251ca1c3 Branch: refs/heads/master Commit: 251ca1c3600f5ecbcf65eb8f367988c44c86a97e Parents: 651257f 8e85c46 Author: Jonathan Eagles <[email protected]> Authored: Fri May 19 14:56:37 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Fri May 19 14:56:37 2017 -0500 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 2 + .../apache/tez/common/DagContainerLauncher.java | 43 ++++++++++++++++++++ .../app/launcher/ContainerLauncherWrapper.java | 8 ++-- .../tez/dag/app/launcher/DagDeleteRunnable.java | 29 +++++++++---- .../tez/dag/app/launcher/DeletionTracker.java | 6 +-- .../dag/app/launcher/DeletionTrackerImpl.java | 24 +++++++---- .../app/launcher/LocalContainerLauncher.java | 25 ++++-------- .../app/launcher/TezContainerLauncherImpl.java | 12 +++--- .../app/rm/container/AMContainerHelpers.java | 4 +- .../dag/app/rm/container/AMContainerImpl.java | 6 ++- .../dag/app/rm/container/AMContainerMap.java | 11 +++-- .../dag/app/rm/container/TestAMContainer.java | 4 +- .../app/rm/container/TestAMContainerMap.java | 4 +- 13 files changed, 117 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/251ca1c3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --cc tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index 31a5d92,f959a7c..cef1c30 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@@ -158,10 -159,8 +158,8 @@@ public class AMContainerHelpers ContainerLaunchContext commonContainerSpec = null; synchronized (commonContainerSpecLock) { if (!commonContainerSpecs.containsKey(tezDAGID)) { - String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, - TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); commonContainerSpec = - createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, auxiliaryService); + createCommonContainerLaunchContext(acls, credentials, localResources, auxiliaryService); commonContainerSpecs.put(tezDAGID, commonContainerSpec); } else { commonContainerSpec = commonContainerSpecs.get(tezDAGID); http://git-wip-us.apache.org/repos/asf/tez/blob/251ca1c3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --cc tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 18e72a7,8e7df32..02243b8 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@@ -335,19 -329,8 +336,20 @@@ public class AMContainerImpl implement this.schedulerId = schedulerId; this.launcherId = launcherId; this.taskCommId = taskCommId; - this.stateMachine = stateMachineFactory.make(this); + this.auxiliaryService = auxiliaryService; + this.stateMachine = new StateMachineTez<>(stateMachineFactory.make(this), this); + augmentStateMachine(); + } + + + private void augmentStateMachine() { + stateMachine + .registerStateEnteredCallback(AMContainerState.STOP_REQUESTED, + NON_RUNNING_STATE_ENTERED_CALLBACK) + .registerStateEnteredCallback(AMContainerState.STOPPING, + NON_RUNNING_STATE_ENTERED_CALLBACK) + .registerStateEnteredCallback(AMContainerState.COMPLETED, + NON_RUNNING_STATE_ENTERED_CALLBACK); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/251ca1c3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java ---------------------------------------------------------------------- diff --cc tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java index 050ffb6,1b2fe16..15338e3 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java @@@ -44,8 -42,8 +45,9 @@@ public class AMContainerMap extends Abs private final TaskCommunicatorManagerInterface tal; private final AppContext context; private final ContainerSignatureMatcher containerSignatureMatcher; - private final ConcurrentHashMap<ContainerId, AMContainer> containerMap; + @VisibleForTesting + final ConcurrentHashMap<ContainerId, AMContainer> containerMap; + private String auxiliaryService; public AMContainerMap(ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface tal, ContainerSignatureMatcher containerSignatureMatcher, AppContext context) { @@@ -68,23 -68,11 +72,22 @@@ } public boolean addContainerIfNew(Container container, int schedulerId, int launcherId, int taskCommId) { - AMContainer amc = new AMContainerImpl(container, chh, tal, - containerSignatureMatcher, context, schedulerId, launcherId, taskCommId, auxiliaryService); + AMContainer amc = createAmContainer(container, chh, tal, - containerSignatureMatcher, context, schedulerId, launcherId, taskCommId); - ++ containerSignatureMatcher, context, schedulerId, launcherId, taskCommId, auxiliaryService); return (containerMap.putIfAbsent(container.getId(), amc) == null); } + AMContainer createAmContainer(Container container, + ContainerHeartbeatHandler chh, + TaskCommunicatorManagerInterface tal, + ContainerSignatureMatcher signatureMatcher, + AppContext appContext, int schedulerId, - int launcherId, int taskCommId) { ++ int launcherId, int taskCommId, String auxiliaryService) { + AMContainer amc = new AMContainerImpl(container, chh, tal, - signatureMatcher, appContext, schedulerId, launcherId, taskCommId); ++ signatureMatcher, appContext, schedulerId, launcherId, taskCommId, auxiliaryService); + return amc; + } + public AMContainer get(ContainerId containerId) { return containerMap.get(containerId); } http://git-wip-us.apache.org/repos/asf/tez/blob/251ca1c3/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --cc tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index 1b9df99,65883ee..d3614d9 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@@ -63,8 -63,8 +63,9 @@@ import org.apache.hadoop.yarn.event.Eve import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; + import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.app.TaskCommunicatorWrapper; +import org.apache.tez.dag.app.rm.node.AMNodeEventType; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.ServicePluginException; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; http://git-wip-us.apache.org/repos/asf/tez/blob/251ca1c3/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java ---------------------------------------------------------------------- diff --cc tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java index efea327,2fcd0c8..d16881c --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java @@@ -1,4 -1,4 +1,4 @@@ --/** ++/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@@ -48,104 -39,35 +48,104 @@@ import org.junit.Test public class TestAMContainerMap { - private ContainerHeartbeatHandler mockContainerHeartBeatHandler() { - return mock(ContainerHeartbeatHandler.class); - } - private TaskCommunicatorManagerInterface mockTaskAttemptListener() throws ServicePluginException { - TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class); - TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress(); - doReturn(taskComm).when(tal).getTaskCommunicator(0); - return tal; - } + @Test (timeout = 10000) + public void testCleanupOnDagComplete() { - private AppContext mockAppContext() { + ContainerHeartbeatHandler chh = mock(ContainerHeartbeatHandler.class); + TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class); AppContext appContext = mock(AppContext.class); - return appContext; - } - @SuppressWarnings("deprecation") - private ContainerId mockContainerId(int cId) { - ApplicationId appId = ApplicationId.newInstance(1000, 1); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - ContainerId containerId = ContainerId.newInstance(appAttemptId, cId); - return containerId; + + + int numContainers = 7; + WrappedContainer[] wContainers = new WrappedContainer[numContainers]; + for (int i = 0 ; i < numContainers ; i++) { + WrappedContainer wc = + new WrappedContainer(false, null, i); + wContainers[i] = wc; + } + + AMContainerMap amContainerMap = new AMContainerMapForTest(chh, tal, mock( + ContainerSignatureMatcher.class), appContext, wContainers); + + for (int i = 0 ; i < numContainers ; i++) { + amContainerMap.addContainerIfNew(wContainers[i].container, 0, 0, 0); + } + + + // Container 1 in LAUNCHING state + wContainers[0].launchContainer(); + wContainers[0].verifyState(AMContainerState.LAUNCHING); + + // Container 2 in IDLE state + wContainers[1].launchContainer(); + wContainers[1].containerLaunched(); + wContainers[1].verifyState(AMContainerState.IDLE); + + // Container 3 RUNNING state + wContainers[2].launchContainer(); + wContainers[2].containerLaunched(); + wContainers[2].assignTaskAttempt(wContainers[2].taskAttemptID); + wContainers[2].verifyState(AMContainerState.RUNNING); + + // Cointainer 4 STOP_REQUESTED + wContainers[3].launchContainer(); + wContainers[3].containerLaunched(); + wContainers[3].stopRequest(); + wContainers[3].verifyState(AMContainerState.STOP_REQUESTED); + + // Container 5 STOPPING + wContainers[4].launchContainer(); + wContainers[4].containerLaunched(); + wContainers[4].stopRequest(); + wContainers[4].nmStopSent(); + wContainers[4].verifyState(AMContainerState.STOPPING); + + // Container 6 COMPLETED + wContainers[5].launchContainer(); + wContainers[5].containerLaunched(); + wContainers[5].stopRequest(); + wContainers[5].nmStopSent(); + wContainers[5].containerCompleted(); + wContainers[5].verifyState(AMContainerState.COMPLETED); + + // Container 7 STOP_REQUESTED + ERROR + wContainers[6].launchContainer(); + wContainers[6].containerLaunched(); + wContainers[6].containerLaunched(); + assertTrue(wContainers[6].amContainer.isInErrorState()); + wContainers[6].verifyState(AMContainerState.STOP_REQUESTED); + + // 7 containers present, and registered with AMContainerMap at this point. + + assertEquals(7, amContainerMap.containerMap.size()); + amContainerMap.dagComplete(mock(DAG.class)); + assertEquals(5, amContainerMap.containerMap.size()); } - private Container mockContainer(ContainerId containerId) { - NodeId nodeId = NodeId.newInstance("localhost", 43255); - Container container = Container.newInstance(containerId, nodeId, "localhost:33333", - Resource.newInstance(1024, 1), Priority.newInstance(1), mock(Token.class)); - return container; + private static class AMContainerMapForTest extends AMContainerMap { + + + private WrappedContainer[] wrappedContainers; + + public AMContainerMapForTest(ContainerHeartbeatHandler chh, + TaskCommunicatorManagerInterface tal, + ContainerSignatureMatcher containerSignatureMatcher, + AppContext context, WrappedContainer[] wrappedContainers) { + super(chh, tal, containerSignatureMatcher, context); + this.wrappedContainers = wrappedContainers; + } + + @Override + AMContainer createAmContainer(Container container, + ContainerHeartbeatHandler chh, + TaskCommunicatorManagerInterface tal, + ContainerSignatureMatcher signatureMatcher, + AppContext appContext, int schedulerId, - int launcherId, int taskCommId) { ++ int launcherId, int taskCommId, String auxiliaryService) { + return wrappedContainers[container.getId().getId()].amContainer; + } + } }
