TEZ-2325. Route status update event directly to the attempt. (Prakash Ramachandran via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c3232d0b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c3232d0b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c3232d0b Branch: refs/heads/TEZ-2003 Commit: c3232d0b748d3fdff6676b403277013ebf9f3e32 Parents: aa87a14 Author: Hitesh Shah <[email protected]> Authored: Mon Apr 27 15:47:07 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Mon Apr 27 15:47:07 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../dag/app/TaskAttemptListenerImpTezDag.java | 46 ++++- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 18 -- .../app/TestTaskAttemptListenerImplTezDag.java | 176 ++++++++++++++----- 4 files changed, 177 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c3232d0b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e42a79e..36e1767 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2325. Route status update event directly to the attempt. TEZ-2358. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task. TEZ-2342. TestFaultTolerance.testRandomFailingTasks fails due to timeout. TEZ-2362. State Change Notifier Thread should be stopped when dag is http://git-wip-us.apache.org/repos/asf/tez/blob/c3232d0b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index b64283b..d96da83 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -22,12 +22,20 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.collections4.ListUtils; +import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; +import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; +import org.apache.tez.runtime.api.impl.EventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -413,10 +421,22 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements LOG.debug("Ping from " + taskAttemptID.toString() + " events: " + (inEvents != null? inEvents.size() : -1)); } - if(inEvents!=null && !inEvents.isEmpty()) { + + List<TezEvent> otherEvents = new ArrayList<TezEvent>(); + for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { + final EventType eventType = tezEvent.getEventType(); + if (eventType == EventType.TASK_STATUS_UPDATE_EVENT || + eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT) { + context.getEventHandler() + .handle(getTaskAttemptEventFromTezEvent(taskAttemptID, tezEvent)); + } else { + otherEvents.add(tezEvent); + } + } + if(!otherEvents.isEmpty()) { TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID(); context.getEventHandler().handle( - new VertexEventRouteEvent(vertexId, inEvents)); + new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents))); } taskHeartbeatHandler.pinged(taskAttemptID); List<TezEvent> outEvents = context @@ -433,6 +453,28 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } } + private TaskAttemptEvent getTaskAttemptEventFromTezEvent(TezTaskAttemptID taskAttemptID, + TezEvent tezEvent) { + final EventType eventType = tezEvent.getEventType(); + TaskAttemptEvent taskAttemptEvent; + switch (eventType) { + case TASK_STATUS_UPDATE_EVENT: + { + taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, + (TaskStatusUpdateEvent) tezEvent.getEvent()); + } + break; + case TASK_ATTEMPT_COMPLETED_EVENT: + { + taskAttemptEvent = new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE); + } + break; + default: + throw new TezUncheckedException("unknown event type " + eventType); + } + return taskAttemptEvent; + } + private Map<String, TezLocalResource> convertLocalResourceMap(Map<String, LocalResource> ylrs) throws IOException { Map<String, TezLocalResource> tlrs = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/tez/blob/c3232d0b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index dfa358d..c4619a0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -4096,24 +4096,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, srcEdge.sendTezEventToSourceTasks(tezEvent); } break; - case TASK_STATUS_UPDATE_EVENT: - { - checkEventSourceMetadata(vertex, sourceMeta); - TaskStatusUpdateEvent sEvent = - (TaskStatusUpdateEvent) tezEvent.getEvent(); - vertex.getEventHandler().handle( - new TaskAttemptEventStatusUpdate(sourceMeta.getTaskAttemptID(), - sEvent)); - } - break; - case TASK_ATTEMPT_COMPLETED_EVENT: - { - checkEventSourceMetadata(vertex, sourceMeta); - vertex.getEventHandler().handle( - new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), - TaskAttemptEventType.TA_DONE)); - } - break; case TASK_ATTEMPT_FAILED_EVENT: { checkEventSourceMetadata(vertex, sourceMeta); http://git-wip-us.apache.org/repos/asf/tez/blob/c3232d0b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index 1f5d9bb..b0ff0e3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -20,53 +20,97 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; +import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; +import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.rm.container.AMContainer; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.AMContainerTask; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.events.InputInitializerEvent; +import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; +import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; +import org.apache.tez.runtime.api.impl.EventType; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +@SuppressWarnings("unchecked") public class TestTaskAttemptListenerImplTezDag { + private ApplicationId appId; + private AppContext appContext; + AMContainerMap amContainerMap; + EventHandler eventHandler; + DAG dag; + TaskAttemptListenerImpTezDag taskAttemptListener; + ContainerTask containerTask; + AMContainerTask amContainerTask; + TaskSpec taskSpec; - @Test(timeout = 5000) - public void testGetTask() throws IOException { - ApplicationId appId = ApplicationId.newInstance(1000, 1); - AppContext appContext = mock(AppContext.class); - EventHandler eventHandler = mock(EventHandler.class); - DAG dag = mock(DAG.class); - AMContainerMap amContainerMap = mock(AMContainerMap.class); + TezVertexID vertexID; + TezTaskID taskID; + TezTaskAttemptID taskAttemptID; + + @Before + public void setUp() { + appId = ApplicationId.newInstance(1000, 1); + dag = mock(DAG.class); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + vertexID = TezVertexID.getInstance(dagID, 1); + taskID = TezTaskID.getInstance(vertexID, 1); + taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1); + + amContainerMap = mock(AMContainerMap.class); Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>(); + + eventHandler = mock(EventHandler.class); + + appContext = mock(AppContext.class); doReturn(eventHandler).when(appContext).getEventHandler(); doReturn(dag).when(appContext).getCurrentDAG(); doReturn(appAcls).when(appContext).getApplicationACLs(); doReturn(amContainerMap).when(appContext).getAllContainers(); - TaskAttemptListenerImpTezDag taskAttemptListener = - new TaskAttemptListenerImplForTest(appContext, mock(TaskHeartbeatHandler.class), - mock(ContainerHeartbeatHandler.class), null); + taskAttemptListener = new TaskAttemptListenerImplForTest(appContext, + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null); + taskSpec = mock(TaskSpec.class); + doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID(); + amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0); + containerTask = null; + } - TaskSpec taskSpec = mock(TaskSpec.class); - TezTaskAttemptID taskAttemptId = mock(TezTaskAttemptID.class); - doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID(); - AMContainerTask amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0); - ContainerTask containerTask = null; - + @Test(timeout = 5000) + public void testGetTask() throws IOException { ContainerId containerId1 = createContainerId(appId, 1); doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1); @@ -74,7 +118,6 @@ public class TestTaskAttemptListenerImplTezDag { containerTask = taskAttemptListener.getTask(containerContext1); assertTrue(containerTask.shouldDie()); - ContainerId containerId2 = createContainerId(appId, 2); doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId2); ContainerContext containerContext2 = new ContainerContext(containerId2.toString()); @@ -89,7 +132,7 @@ public class TestTaskAttemptListenerImplTezDag { assertEquals(taskSpec, containerTask.getTaskSpec()); // Task unregistered. Should respond to heartbeats - taskAttemptListener.unregisterTaskAttempt(taskAttemptId); + taskAttemptListener.unregisterTaskAttempt(taskAttemptID); containerTask = taskAttemptListener.getTask(containerContext2); assertNull(containerTask); @@ -115,29 +158,6 @@ public class TestTaskAttemptListenerImplTezDag { @Test(timeout = 5000) public void testGetTaskMultiplePulls() throws IOException { - ApplicationId appId = ApplicationId.newInstance(1000, 1); - AppContext appContext = mock(AppContext.class); - EventHandler eventHandler = mock(EventHandler.class); - DAG dag = mock(DAG.class); - AMContainerMap amContainerMap = mock(AMContainerMap.class); - Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>(); - doReturn(eventHandler).when(appContext).getEventHandler(); - doReturn(dag).when(appContext).getCurrentDAG(); - doReturn(appAcls).when(appContext).getApplicationACLs(); - doReturn(amContainerMap).when(appContext).getAllContainers(); - - TaskAttemptListenerImpTezDag taskAttemptListener = - new TaskAttemptListenerImplForTest(appContext, mock(TaskHeartbeatHandler.class), - mock(ContainerHeartbeatHandler.class), null); - - - TaskSpec taskSpec = mock(TaskSpec.class); - TezTaskAttemptID taskAttemptId = mock(TezTaskAttemptID.class); - doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID(); - AMContainerTask amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0); - ContainerTask containerTask = null; - - ContainerId containerId1 = createContainerId(appId, 1); doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1); ContainerContext containerContext1 = new ContainerContext(containerId1.toString()); @@ -156,10 +176,78 @@ public class TestTaskAttemptListenerImplTezDag { assertNull(containerTask); } - private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) { + @Test (timeout = 5000) + public void testTaskEventRouting() throws Exception { + List<TezEvent> events = Arrays.asList( + new TezEvent(InputInitializerEvent.create("test_vertex", "test_input", null), null), + new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null) + ); + + EventHandler eventHandler = generateHeartbeat(events); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(arg.capture()); + final List<Event> argAllValues = arg.getAllValues(); + + final Event statusUpdateEvent = argAllValues.get(0); + assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE, + statusUpdateEvent.getType()); + + + final Event vertexEvent = argAllValues.get(1); + final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent; + assertEquals("Other events should be routed to vertex", VertexEventType.V_ROUTE_EVENT, + vertexEvent.getType()); + assertEquals(EventType.ROOT_INPUT_INITIALIZER_EVENT, + vertexRouteEvent.getEvents().get(0).getEventType()); + } + + @Test (timeout = 5000) + public void testTaskEventRoutingTaskAttemptOnly() throws Exception { + List<TezEvent> events = Arrays.asList( + new TezEvent(new TaskAttemptCompletedEvent(), null) + ); + final EventHandler eventHandler = generateHeartbeat(events); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(1)).handle(arg.capture()); + final List<Event> argAllValues = arg.getAllValues(); + + final Event statusUpdateEvent = argAllValues.get(0); + assertEquals("only event should be task done", TaskAttemptEventType.TA_DONE, + statusUpdateEvent.getType()); + } + + private EventHandler generateHeartbeat(List<TezEvent> events) throws IOException, TezException { + ContainerId containerId = createContainerId(appId, 1); + long requestId = 0; + Vertex vertex = mock(Vertex.class); + Task task = mock(Task.class); + + doReturn(vertex).when(dag).getVertex(vertexID); + doReturn("test_vertex").when(vertex).getName(); + doReturn(task).when(vertex).getTask(taskID); + + doReturn(new ArrayList<TezEvent>()).when(task).getTaskAttemptTezEvents(taskAttemptID, 0, 1); + + taskAttemptListener.registerRunningContainer(containerId); + taskAttemptListener.registerTaskAttempt(amContainerTask, containerId); + + TezHeartbeatRequest request = mock(TezHeartbeatRequest.class); + doReturn(containerId.toString()).when(request).getContainerIdentifier(); + doReturn(taskAttemptID).when(request).getCurrentTaskAttemptID(); + doReturn(++requestId).when(request).getRequestId(); + doReturn(events).when(request).getEvents(); + + taskAttemptListener.heartbeat(request); + + return eventHandler; + } + + + private ContainerId createContainerId(ApplicationId applicationId, long containerIdx) { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1); - ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx); - return containerId; + return ContainerId.newContainerId(appAttemptId, containerIdx); } private static class TaskAttemptListenerImplForTest extends TaskAttemptListenerImpTezDag {
