Repository: tez Updated Branches: refs/heads/TEZ-2003 7d5f3f8e3 -> 5d6e80367
TEZ-2381. Fixes after rebase 04/28. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5d6e8036 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5d6e8036 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5d6e8036 Branch: refs/heads/TEZ-2003 Commit: 5d6e80367616dbea629ba71fc35bd12c30707164 Parents: 7d5f3f8 Author: Siddharth Seth <[email protected]> Authored: Tue Apr 28 13:41:12 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue Apr 28 13:41:12 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../dag/app/TaskAttemptListenerImpTezDag.java | 17 ++++---- .../app/TestTaskAttemptListenerImplTezDag.java | 45 ++++++++++++++------ 3 files changed, 42 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5d6e8036/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index f6bc8e7..d42aaf8 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -18,5 +18,6 @@ ALL CHANGES: TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. TEZ-2347. Expose additional information in TaskCommunicatorContext. TEZ-2361. Propagate dag completion to TaskCommunicator. + TEZ-2381. Fixes after rebase 04/28. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/5d6e8036/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 61bd4ca..c586787 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -17,28 +17,22 @@ package org.apache.tez.dag.app; -import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.net.InetSocketAddress; -import java.net.URISyntaxException; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections4.ListUtils; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventType; -import com.google.common.base.Preconditions; -import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +56,6 @@ import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.rm.container.AMContainerTask; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -151,7 +144,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) { if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { LOG.info("Using Default Task Communicator"); - return new TezTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]); + return createTezTaskCommunicator(taskCommunicatorContexts[taskCommIndex]); } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { LOG.info("Using Default Local Task Communicator"); return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]); @@ -174,6 +167,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } } } + + @VisibleForTesting + protected TezTaskCommunicatorImpl createTezTaskCommunicator(TaskCommunicatorContext context) { + return new TezTaskCommunicatorImpl(context); + } + public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException { ContainerId containerId = ConverterUtils.toContainerId(request http://git-wip-us.apache.org/repos/asf/tez/blob/5d6e8036/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index fa62c11..55a546e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -42,11 +42,9 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.TaskHeartbeatRequest; import org.apache.tez.dag.api.TezException; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tez.common.ContainerContext; -import org.apache.tez.common.ContainerTask; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.app.dag.DAG; @@ -68,7 +66,6 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventType; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; -import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -77,7 +74,9 @@ import org.mockito.ArgumentCaptor; // TODO TEZ-2003 Rename to TestTezTaskCommunicator public class TestTaskAttemptListenerImplTezDag { private ApplicationId appId; + private ApplicationAttemptId appAttemptId; private AppContext appContext; + Credentials credentials; AMContainerMap amContainerMap; EventHandler eventHandler; DAG dag; @@ -93,11 +92,13 @@ public class TestTaskAttemptListenerImplTezDag { @Before public void setUp() { appId = ApplicationId.newInstance(1000, 1); + appAttemptId = ApplicationAttemptId.newInstance(appId, 1); dag = mock(DAG.class); TezDAGID dagID = TezDAGID.getInstance(appId, 1); vertexID = TezVertexID.getInstance(dagID, 1); taskID = TezTaskID.getInstance(vertexID, 1); taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1); + credentials = new Credentials(); amContainerMap = mock(AMContainerMap.class); Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>(); @@ -109,6 +110,8 @@ public class TestTaskAttemptListenerImplTezDag { doReturn(dag).when(appContext).getCurrentDAG(); doReturn(appAcls).when(appContext).getApplicationACLs(); doReturn(amContainerMap).when(appContext).getAllContainers(); + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); + doReturn(credentials).when(appContext).getAppCredentials(); NodeId nodeId = NodeId.newInstance("localhost", 0); AMContainer amContainer = mock(AMContainer.class); @@ -150,7 +153,7 @@ public class TestTaskAttemptListenerImplTezDag { assertEquals(taskSpec, containerTask.getTaskSpec()); // Task unregistered. Should respond to heartbeats - taskAttemptListener.unregisterTaskAttempt(taskAttemptId, 0); + taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0); containerTask = tezUmbilical.getTask(containerContext2); assertNull(containerTask); @@ -180,7 +183,7 @@ public class TestTaskAttemptListenerImplTezDag { TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); ContainerId containerId1 = createContainerId(appId, 1); - doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1); + ContainerContext containerContext1 = new ContainerContext(containerId1.toString()); taskAttemptListener.registerRunningContainer(containerId1, 0); containerTask = tezUmbilical.getTask(containerContext1); @@ -241,7 +244,6 @@ public class TestTaskAttemptListenerImplTezDag { private EventHandler generateHeartbeat(List<TezEvent> events) throws IOException, TezException { ContainerId containerId = createContainerId(appId, 1); - long requestId = 0; Vertex vertex = mock(Vertex.class); Task task = mock(Task.class); @@ -251,13 +253,13 @@ public class TestTaskAttemptListenerImplTezDag { doReturn(new ArrayList<TezEvent>()).when(task).getTaskAttemptTezEvents(taskAttemptID, 0, 1); - taskAttemptListener.registerRunningContainer(containerId); - taskAttemptListener.registerTaskAttempt(amContainerTask, containerId); + taskAttemptListener.registerRunningContainer(containerId, 0); + taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0); + + TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class); - TezHeartbeatRequest request = mock(TezHeartbeatRequest.class); doReturn(containerId.toString()).when(request).getContainerIdentifier(); - doReturn(taskAttemptID).when(request).getCurrentTaskAttemptID(); - doReturn(++requestId).when(request).getRequestId(); + doReturn(taskAttemptID).when(request).getTaskAttemptId(); doReturn(events).when(request).getEvents(); taskAttemptListener.heartbeat(request); @@ -271,6 +273,25 @@ public class TestTaskAttemptListenerImplTezDag { return ContainerId.newInstance(appAttemptId, containerIdx); } + private static class TaskAttemptListenerImplForTest extends TaskAttemptListenerImpTezDag { + + public TaskAttemptListenerImplForTest(AppContext context, + TaskHeartbeatHandler thh, + ContainerHeartbeatHandler chh, + JobTokenSecretManager jobTokenSecretManager, + String[] taskCommunicatorClassIdentifiers, + boolean isPureLocalMode) { + super(context, thh, chh, jobTokenSecretManager, taskCommunicatorClassIdentifiers, + isPureLocalMode); + } + + @Override + protected TezTaskCommunicatorImpl createTezTaskCommunicator(TaskCommunicatorContext context) { + return new TezTaskCommunicatorImplForTest(context); + } + + } + private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl { public TezTaskCommunicatorImplForTest(
