TEZ-2381. Fixes after rebase 04/28. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/57745276 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/57745276 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/57745276 Branch: refs/heads/TEZ-2003 Commit: 57745276e1f8352c9a76693f737096bf6cff7b4c Parents: 9d38581 Author: Siddharth Seth <[email protected]> Authored: Tue Apr 28 13:41:12 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Aug 6 01:25:34 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../dag/app/TaskAttemptListenerImpTezDag.java | 17 ++++---- .../app/TestTaskAttemptListenerImplTezDag.java | 44 +++++++++++++++----- 3 files changed, 42 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/57745276/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index f6bc8e7..d42aaf8 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -18,5 +18,6 @@ ALL CHANGES: TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. TEZ-2347. Expose additional information in TaskCommunicatorContext. TEZ-2361. Propagate dag completion to TaskCommunicator. + TEZ-2381. Fixes after rebase 04/28. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/57745276/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 7cdf292..cbaed99 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -17,27 +17,21 @@ package org.apache.tez.dag.app; -import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.net.InetSocketAddress; -import java.net.URISyntaxException; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections4.ListUtils; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventType; -import com.google.common.base.Preconditions; -import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +55,6 @@ import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.rm.container.AMContainerTask; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -150,7 +143,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) { if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { LOG.info("Using Default Task Communicator"); - return new TezTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]); + return createTezTaskCommunicator(taskCommunicatorContexts[taskCommIndex]); } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { LOG.info("Using Default Local Task Communicator"); return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]); @@ -173,6 +166,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } } } + + @VisibleForTesting + protected TezTaskCommunicatorImpl createTezTaskCommunicator(TaskCommunicatorContext context) { + return new TezTaskCommunicatorImpl(context); + } + public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException { ContainerId containerId = ConverterUtils.toContainerId(request http://git-wip-us.apache.org/repos/asf/tez/blob/57745276/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index ab9fafe..2208220 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -47,11 +47,9 @@ import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TaskHeartbeatRequest; import org.apache.tez.dag.api.TezException; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tez.common.ContainerContext; -import org.apache.tez.common.ContainerTask; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.app.dag.DAG; @@ -82,7 +80,9 @@ import org.mockito.ArgumentCaptor; // TODO TEZ-2003 Rename to TestTezTaskCommunicator public class TestTaskAttemptListenerImplTezDag { private ApplicationId appId; + private ApplicationAttemptId appAttemptId; private AppContext appContext; + Credentials credentials; AMContainerMap amContainerMap; EventHandler eventHandler; DAG dag; @@ -98,11 +98,13 @@ public class TestTaskAttemptListenerImplTezDag { @Before public void setUp() { appId = ApplicationId.newInstance(1000, 1); + appAttemptId = ApplicationAttemptId.newInstance(appId, 1); dag = mock(DAG.class); TezDAGID dagID = TezDAGID.getInstance(appId, 1); vertexID = TezVertexID.getInstance(dagID, 1); taskID = TezTaskID.getInstance(vertexID, 1); taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1); + credentials = new Credentials(); amContainerMap = mock(AMContainerMap.class); Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>(); @@ -118,6 +120,8 @@ public class TestTaskAttemptListenerImplTezDag { doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(clock).when(appContext).getClock(); + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); + doReturn(credentials).when(appContext).getAppCredentials(); NodeId nodeId = NodeId.newInstance("localhost", 0); AMContainer amContainer = mock(AMContainer.class); Container container = mock(Container.class); @@ -160,7 +164,7 @@ public class TestTaskAttemptListenerImplTezDag { assertEquals(taskSpec, containerTask.getTaskSpec()); // Task unregistered. Should respond to heartbeats - taskAttemptListener.unregisterTaskAttempt(taskAttemptId, 0); + taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0); containerTask = tezUmbilical.getTask(containerContext2); assertNull(containerTask); @@ -190,7 +194,7 @@ public class TestTaskAttemptListenerImplTezDag { TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); ContainerId containerId1 = createContainerId(appId, 1); - doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1); + ContainerContext containerContext1 = new ContainerContext(containerId1.toString()); taskAttemptListener.registerRunningContainer(containerId1, 0); containerTask = tezUmbilical.getTask(containerContext1); @@ -320,7 +324,6 @@ public class TestTaskAttemptListenerImplTezDag { int fromEventId, int maxEvents, int nextFromEventId, List<TezEvent> sendEvents) throws IOException, TezException { ContainerId containerId = createContainerId(appId, 1); - long requestId = 0; Vertex vertex = mock(Vertex.class); doReturn(vertex).when(dag).getVertex(vertexID); @@ -328,13 +331,13 @@ public class TestTaskAttemptListenerImplTezDag { TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents, 0); doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents); - taskAttemptListener.registerRunningContainer(containerId); - taskAttemptListener.registerTaskAttempt(amContainerTask, containerId); + taskAttemptListener.registerRunningContainer(containerId, 0); + taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0); + + TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class); - TezHeartbeatRequest request = mock(TezHeartbeatRequest.class); doReturn(containerId.toString()).when(request).getContainerIdentifier(); - doReturn(taskAttemptID).when(request).getCurrentTaskAttemptID(); - doReturn(++requestId).when(request).getRequestId(); + doReturn(taskAttemptID).when(request).getTaskAttemptId(); doReturn(events).when(request).getEvents(); doReturn(maxEvents).when(request).getMaxEvents(); doReturn(fromEventId).when(request).getStartIndex(); @@ -348,6 +351,25 @@ public class TestTaskAttemptListenerImplTezDag { return ContainerId.newInstance(appAttemptId, containerIdx); } + private static class TaskAttemptListenerImplForTest extends TaskAttemptListenerImpTezDag { + + public TaskAttemptListenerImplForTest(AppContext context, + TaskHeartbeatHandler thh, + ContainerHeartbeatHandler chh, + JobTokenSecretManager jobTokenSecretManager, + String[] taskCommunicatorClassIdentifiers, + boolean isPureLocalMode) { + super(context, thh, chh, jobTokenSecretManager, taskCommunicatorClassIdentifiers, + isPureLocalMode); + } + + @Override + protected TezTaskCommunicatorImpl createTezTaskCommunicator(TaskCommunicatorContext context) { + return new TezTaskCommunicatorImplForTest(context); + } + + } + private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl { public TezTaskCommunicatorImplForTest(
