Repository: tez Updated Branches: refs/heads/TEZ-2003 d03e330fa -> 620e095c5
TEZ-2433. Fixes after rebase 05/08. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/620e095c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/620e095c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/620e095c Branch: refs/heads/TEZ-2003 Commit: 620e095c584331b634bb4cbf1973db3bc893db21 Parents: d03e330 Author: Siddharth Seth <[email protected]> Authored: Fri May 8 18:43:16 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri May 8 18:43:16 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../tez/dag/api/TaskHeartbeatResponse.java | 10 ++++++-- .../dag/app/TaskAttemptListenerImpTezDag.java | 26 ++++++++++---------- .../tez/dag/app/TezTaskCommunicatorImpl.java | 9 +++---- .../app/TestTaskAttemptListenerImplTezDag.java | 11 +++------ .../library/common/shuffle/TestFetcher.java | 8 ++---- 6 files changed, 31 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/620e095c/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 9b2339f..ad167ab 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -22,5 +22,6 @@ ALL CHANGES: TEZ-2388. Send dag identifier as part of the fetcher request string. TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. TEZ-2420. TaskRunner returning before executing the task. + TEZ-2433. Fixes after rebase 05/08 INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/620e095c/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java index c82a743..b826e76 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java @@ -22,11 +22,13 @@ import org.apache.tez.runtime.api.impl.TezEvent; public class TaskHeartbeatResponse { private final boolean shouldDie; - private List<TezEvent> events; + private final int nextFromEventId; + private final List<TezEvent> events; - public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events) { + public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId) { this.shouldDie = shouldDie; this.events = events; + this.nextFromEventId = nextFromEventId; } public boolean isShouldDie() { @@ -36,4 +38,8 @@ public class TaskHeartbeatResponse { public List<TezEvent> getEvents() { return events; } + + public int getNextFromEventId() { + return nextFromEventId; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/620e095c/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index d30919b..1182d54 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -79,7 +79,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements protected final TaskHeartbeatHandler taskHeartbeatHandler; protected final ContainerHeartbeatHandler containerHeartbeatHandler; - private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null); + private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0); private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts = new ConcurrentHashMap<TezTaskAttemptID, ContainerId>(); @@ -195,7 +195,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements // So - avoiding synchronization. pingContainerHeartbeatHandler(containerId); - List<TezEvent> outEvents = null; + TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null); TezTaskAttemptID taskAttemptID = request.getTaskAttemptId(); if (taskAttemptID != null) { ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID); @@ -217,12 +217,17 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } List<TezEvent> otherEvents = new ArrayList<TezEvent>(); + // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events + // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT) + // to VertexImpl to ensure the events ordering + // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent + // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { final EventType eventType = tezEvent.getEventType(); - if (eventType == EventType.TASK_STATUS_UPDATE_EVENT || - eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT) { - context.getEventHandler() - .handle(getTaskAttemptEventFromTezEvent(taskAttemptID, tezEvent)); + if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) { + TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, + (TaskStatusUpdateEvent) tezEvent.getEvent()); + context.getEventHandler().handle(taskAttemptEvent); } else { otherEvents.add(tezEvent); } @@ -233,14 +238,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents))); } taskHeartbeatHandler.pinged(taskAttemptID); - outEvents = context + eventInfo = context .getCurrentDAG() .getVertex(taskAttemptID.getTaskID().getVertexID()) - .getTask(taskAttemptID.getTaskID()) .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getMaxEvents()); } - return new TaskHeartbeatResponse(false, outEvents); + return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId()); } public void taskAlive(TezTaskAttemptID taskAttemptId) { taskHeartbeatHandler.pinged(taskAttemptId); @@ -436,8 +440,4 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements + ", ContainerId not known for this attempt"); } } - - public TaskCommunicator getTaskCommunicator() { - return taskCommunicators[0]; - } } http://git-wip-us.apache.org/repos/asf/tez/blob/620e095c/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 825a4d2..34c8822 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -362,13 +362,10 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { request.getMaxEvents()); tResponse = taskCommunicatorContext.heartbeat(tRequest); } - TezHeartbeatResponse response; - if (tResponse == null) { - response = new TezHeartbeatResponse(); - } else { - response = new TezHeartbeatResponse(tResponse.getEvents()); - } + TezHeartbeatResponse response = new TezHeartbeatResponse(); response.setLastRequestId(requestId); + response.setEvents(tResponse.getEvents()); + response.setNextFromEventId(tResponse.getNextFromEventId()); containerInfo.lastRequestId = requestId; containerInfo.lastResponse = response; return response; http://git-wip-us.apache.org/repos/asf/tez/blob/620e095c/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index ae9ebc0..5924fc1 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -44,6 +44,7 @@ import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TaskHeartbeatRequest; +import org.apache.tez.dag.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezException; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.TezTaskUmbilicalProtocol; @@ -61,14 +62,11 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.events.DataMovementEvent; -import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventType; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; -import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; -import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -252,14 +250,13 @@ public class TestTaskAttemptListenerImplTezDag { public void testTaskHeartbeatResponse() throws Exception { List<TezEvent> events = new ArrayList<TezEvent>(); List<TezEvent> eventsToSend = new ArrayList<TezEvent>(); - TezHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend); + TaskHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend); assertEquals(2, response.getNextFromEventId()); - assertEquals(1, response.getLastRequestId()); assertEquals(eventsToSend, response.getEvents()); } - private TezHeartbeatResponse generateHeartbeat(List<TezEvent> events, + private TaskHeartbeatResponse generateHeartbeat(List<TezEvent> events, int fromEventId, int maxEvents, int nextFromEventId, List<TezEvent> sendEvents) throws IOException, TezException { ContainerId containerId = createContainerId(appId, 1); @@ -274,7 +271,7 @@ public class TestTaskAttemptListenerImplTezDag { taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0); TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class); - + doReturn(containerId.toString()).when(request).getContainerIdentifier(); doReturn(containerId.toString()).when(request).getContainerIdentifier(); doReturn(taskAttemptID).when(request).getTaskAttemptId(); doReturn(events).when(request).getEvents(); http://git-wip-us.apache.org/repos/asf/tez/blob/620e095c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java index d2b0bde..1a7d628 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java @@ -31,7 +31,6 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; @@ -39,11 +38,8 @@ import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.runtime.api.ExecutionContext; -import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; @@ -87,7 +83,7 @@ public class TestFetcher { // when enabled and hostname does not match use http fetch. builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, + ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT); builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -103,7 +99,7 @@ public class TestFetcher { // when enabled and port does not match use http fetch. builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT); + ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT); builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts)); fetcher = spy(builder.build());
