TEZ-2789. Backport events added in TEZ-2612 to branch-0.7 (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bc56ca31 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bc56ca31 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bc56ca31 Branch: refs/heads/branch-0.7 Commit: bc56ca3157973971b7e0e21ed834d56ecc7cdd46 Parents: 9be8cd4 Author: Bikas Saha <[email protected]> Authored: Wed Sep 9 13:54:23 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Wed Sep 9 13:54:23 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/ATSConstants.java | 4 + .../dag/app/TaskAttemptListenerImpTezDag.java | 19 ++- .../org/apache/tez/dag/app/dag/TaskAttempt.java | 3 + .../dag/event/TaskAttemptEventStatusUpdate.java | 9 ++ .../dag/app/dag/event/TaskEventTAUpdate.java | 14 +- .../org/apache/tez/dag/app/dag/impl/Edge.java | 13 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 138 +++++++++++++++++-- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 61 ++++++-- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 27 +++- .../tez/dag/app/dag/impl/VertexManager.java | 3 +- .../tez/dag/app/rm/container/AMContainer.java | 1 + .../dag/app/rm/container/AMContainerImpl.java | 12 ++ .../events/TaskAttemptFinishedEvent.java | 32 ++++- .../history/events/TaskAttemptStartedEvent.java | 52 +++++-- .../VertexRecoverableEventsGeneratedEvent.java | 3 +- .../impl/HistoryEventJsonConversion.java | 9 ++ .../apache/tez/dag/history/utils/DAGUtils.java | 24 ++++ tez-dag/src/main/proto/HistoryEvents.proto | 10 ++ .../apache/tez/dag/app/MockDAGAppMaster.java | 10 +- .../tez/dag/app/TestMockDAGAppMaster.java | 6 +- .../app/TestTaskAttemptListenerImplTezDag.java | 38 ++++- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 89 ++++++++++++ .../app/dag/impl/TestTaskAttemptRecovery.java | 27 +++- .../tez/dag/app/dag/impl/TestTaskImpl.java | 41 +++++- .../tez/dag/app/dag/impl/TestTaskRecovery.java | 46 +++---- .../tez/dag/app/dag/impl/TestVertexImpl.java | 31 ++++- .../dag/app/rm/container/TestAMContainer.java | 5 +- .../TestHistoryEventsProtoConversion.java | 30 +++- .../impl/TestHistoryEventJsonConversion.java | 4 +- .../ats/HistoryEventTimelineConversion.java | 11 +- .../ats/TestHistoryEventTimelineConversion.java | 31 ++++- .../apache/tez/runtime/api/impl/TezEvent.java | 17 +++ 33 files changed, 710 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 812004c..90935c7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2789. Backport events added in TEZ-2612 to branch-0.7 TEZ-2766. Tez UI: Add vertex in-progress info in DAG details TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting down an AM. http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index a85dbd9..f786a4e 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -58,6 +58,8 @@ public class ATSConstants { public static final String VERTEX_NAME = "vertexName"; public static final String VERTEX_NAME_ID_MAPPING = "vertexNameIdMapping"; public static final String SCHEDULED_TIME = "scheduledTime"; + public static final String CREATION_TIME = "creationTime"; + public static final String ALLOCATION_TIME = "allocationTime"; public static final String INIT_REQUESTED_TIME = "initRequestedTime"; public static final String INIT_TIME = "initTime"; public static final String START_REQUESTED_TIME = "startRequestedTime"; @@ -82,7 +84,9 @@ public class ATSConstants { public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL"; public static final String COMPLETED_LOGS_URL = "completedLogsURL"; public static final String EXIT_STATUS = "exitStatus"; + public static final String LAST_DATA_EVENTS = "lastDataEvents"; public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers"; + public static final String CREATION_CAUSAL_ATTEMPT = "creationCausalAttempt"; /* Counters-related keys */ public static final String COUNTER_GROUPS = "counterGroups"; http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index ddbee59..5ef89f6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap; import org.apache.commons.collections4.ListUtils; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventType; import org.slf4j.Logger; @@ -428,22 +427,34 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements " events: " + (inEvents != null? inEvents.size() : -1)); } + long currTime = context.getClock().getTime(); List<TezEvent> otherEvents = new ArrayList<TezEvent>(); // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT) // to VertexImpl to ensure the events ordering // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent + TaskAttemptEventStatusUpdate taskAttemptEvent = null; + boolean readErrorReported = false; for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { + // for now, set the event time on the AM when it is received. + // this avoids any time disparity between machines. + tezEvent.setEventReceivedTime(currTime); final EventType eventType = tezEvent.getEventType(); if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) { - TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, - (TaskStatusUpdateEvent) tezEvent.getEvent()); - context.getEventHandler().handle(taskAttemptEvent); + taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, + (TaskStatusUpdateEvent) tezEvent.getEvent()); } else { + if (eventType == EventType.INPUT_READ_ERROR_EVENT) { + readErrorReported = true; + } otherEvents.add(tezEvent); } } + if (taskAttemptEvent != null) { + taskAttemptEvent.setReadErrorReported(readErrorReported); + context.getEventHandler().handle(taskAttemptEvent); + } if(!otherEvents.isEmpty()) { TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID(); context.getEventHandler().handle( http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java index 6c85cc2..4360cc3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java @@ -34,6 +34,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.impl.TezEvent; /** * Read only view of TaskAttempt. @@ -79,6 +80,8 @@ public interface TaskAttempt { float getProgress(); TaskAttemptState getState(); TaskAttemptState getStateNoLock(); + + void setLastEventSent(TezEvent lastEventSent); /** * Has attempt reached the final state or not. http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java index c5a6ea7..458679c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java @@ -24,6 +24,7 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent { private TaskStatusUpdateEvent taskAttemptStatus; + private boolean readErrorReported = false; public TaskAttemptEventStatusUpdate(TezTaskAttemptID id, TaskStatusUpdateEvent statusEvent) { @@ -34,4 +35,12 @@ public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent { public TaskStatusUpdateEvent getStatusEvent() { return this.taskAttemptStatus; } + + public void setReadErrorReported(boolean value) { + readErrorReported = value; + } + + public boolean getReadErrorReported() { + return readErrorReported; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java index 59c7363..01eaf5b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java @@ -18,19 +18,31 @@ package org.apache.tez.dag.app.dag.event; +import org.apache.tez.common.TezAbstractEvent; import org.apache.tez.dag.records.TezTaskAttemptID; +@SuppressWarnings("rawtypes") public class TaskEventTAUpdate extends TaskEvent { private TezTaskAttemptID attemptID; + private TezAbstractEvent causalEvent; public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type) { + this(id, type, null); + } + + public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type, TezAbstractEvent causalEvent) { super(id.getTaskID(), type); this.attemptID = id; + this.causalEvent = causalEvent; } - + public TezTaskAttemptID getTaskAttemptID() { return attemptID; } + + public TezAbstractEvent getCausalEvent() { + return causalEvent; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java index 576e1cf..0be7790 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java @@ -382,7 +382,7 @@ public class Edge { EventMetaData srcInfo = tezEvent.getSourceInfo(); for (DataMovementEvent dmEvent : compEvent.getEvents()) { - TezEvent newEvent = new TezEvent(dmEvent, srcInfo); + TezEvent newEvent = new TezEvent(dmEvent, srcInfo, tezEvent.getEventReceivedTime()); sendTezEventToDestinationTasks(newEvent); } } @@ -411,7 +411,7 @@ public class Edge { InputFailedEvent ifEvent = ((InputFailedEvent) event); e = InputFailedEvent.create(inputIndex, ifEvent.getVersion()); } - tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo()); + tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime()); tezEventToSend.setDestinationInfo(destinationMetaInfo); // cache the event object per input because are unique per input index inputIndicesWithEvents.put(inputIndex, tezEventToSend); @@ -558,7 +558,8 @@ public class Edge { DataMovementEvent e = compEvent.expand(sourceIndices[numEventsDone], targetIndices[numEventsDone]); numEventsDone++; - TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo()); + TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), + tezEvent.getEventReceivedTime()); tezEventToSend.setDestinationInfo(destinationMetaInfo); listToAdd.add(tezEventToSend); } @@ -590,7 +591,8 @@ public class Edge { while (numEventsDone < numEvents && listSize++ < listMaxSize) { InputFailedEvent e = ifEvent.makeCopy(targetIndices[numEventsDone]); numEventsDone++; - TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo()); + TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), + tezEvent.getEventReceivedTime()); tezEventToSend.setDestinationInfo(destinationMetaInfo); listToAdd.add(tezEventToSend); } @@ -622,7 +624,8 @@ public class Edge { while (numEventsDone < numEvents && listSize++ < listMaxSize) { DataMovementEvent e = dmEvent.makeCopy(targetIndices[numEventsDone]); numEventsDone++; - TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo()); + TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), + tezEvent.getEventReceivedTime()); tezEventToSend.setDestinationInfo(destinationMetaInfo); listToAdd.add(tezEventToSend); } http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 5792af0..266358d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -87,6 +87,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate; import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded; import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest; +import org.apache.tez.dag.app.rm.container.AMContainer; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; @@ -96,6 +97,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto; import org.apache.tez.dag.utils.TezBuilderUtils; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.events.InputReadErrorEvent; @@ -118,6 +120,37 @@ public class TaskAttemptImpl implements TaskAttempt, private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptImpl.class); private static final String LINE_SEPARATOR = System .getProperty("line.separator"); + + public static class DataEventDependencyInfo { + long timestamp; + TezTaskAttemptID taId; + public DataEventDependencyInfo(long time, TezTaskAttemptID id) { + this.timestamp = time; + this.taId = id; + } + public long getTimestamp() { + return timestamp; + } + public TezTaskAttemptID getTaskAttemptId() { + return taId; + } + public static DataEventDependencyInfoProto toProto(DataEventDependencyInfo info) { + DataEventDependencyInfoProto.Builder builder = DataEventDependencyInfoProto.newBuilder(); + builder.setTimestamp(info.timestamp); + if (info.taId != null) { + builder.setTaskAttemptId(info.taId.toString()); + } + return builder.build(); + } + + public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto proto) { + TezTaskAttemptID taId = null; + if(proto.hasTaskAttemptId()) { + taId = TezTaskAttemptID.fromString(proto.getTaskAttemptId()); + } + return new DataEventDependencyInfo(proto.getTimestamp(), taId); + } + } static final TezCounters EMPTY_COUNTERS = new TezCounters(); @@ -139,6 +172,7 @@ public class TaskAttemptImpl implements TaskAttempt, // TODO Can these be replaced by the container object TEZ-1037 private Container container; + private long allocationTime; private ContainerId containerId; private NodeId containerNodeId; private String nodeHttpAddress; @@ -148,6 +182,10 @@ public class TaskAttemptImpl implements TaskAttempt, private final Vertex vertex; @VisibleForTesting + boolean appendNextDataEvent = true; + ArrayList<DataEventDependencyInfo> lastDataEvents = Lists.newArrayList(); + + @VisibleForTesting TaskAttemptStatus reportedStatus; private DAGCounter localityCounter; @@ -166,6 +204,9 @@ public class TaskAttemptImpl implements TaskAttempt, private final Resource taskResource; private final ContainerContext containerContext; private final boolean leafVertex; + + private TezTaskAttemptID creationCausalTA; + private long creationTime; protected static final FailedTransitionHelper FAILED_HELPER = new FailedTransitionHelper(); @@ -411,12 +452,22 @@ public class TaskAttemptImpl implements TaskAttempt, boolean isRescheduled, Resource resource, ContainerContext containerContext, boolean leafVertex, Task task) { + this(taskId, attemptNumber, eventHandler, taskAttemptListener, conf, clock, + taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex, + task, null); + } + public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, + TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock, + TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, + boolean isRescheduled, + Resource resource, ContainerContext containerContext, boolean leafVertex, + Task task, TezTaskAttemptID schedulingCausalTA) { - this.MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration + MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT); - this.MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration + MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT); ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -431,6 +482,8 @@ public class TaskAttemptImpl implements TaskAttempt, this.appContext = appContext; this.task = task; this.vertex = this.task.getVertex(); + this.creationCausalTA = schedulingCausalTA; + this.creationTime = clock.getTime(); this.reportedStatus = new TaskAttemptStatus(this.attemptId); initTaskAttemptStatus(reportedStatus); @@ -462,6 +515,10 @@ public class TaskAttemptImpl implements TaskAttempt, public TezDAGID getDAGID() { return getVertexID().getDAGId(); } + + public TezTaskAttemptID getSchedulingCausalTA() { + return creationCausalTA; + } TaskSpec createRemoteTaskSpec() throws AMUserCodeException { TaskSpec baseTaskSpec = task.getBaseTaskSpec(); @@ -660,6 +717,33 @@ public class TaskAttemptImpl implements TaskAttempt, } } + public long getCreationTime() { + readLock.lock(); + try { + return creationTime; + } finally { + readLock.unlock(); + } + } + + public TezTaskAttemptID getCreationCausalAttempt() { + readLock.lock(); + try { + return creationCausalTA; + } finally { + readLock.unlock(); + } + } + + public long getAllocationTime() { + readLock.lock(); + try { + return allocationTime; + } finally { + readLock.unlock(); + } + } + @Override public long getFinishTime() { readLock.lock(); @@ -753,6 +837,9 @@ public class TaskAttemptImpl implements TaskAttempt, { TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent; this.launchTime = tEvent.getStartTime(); + this.creationTime = tEvent.getCreationTime(); + this.allocationTime = tEvent.getAllocationTime(); + this.creationCausalTA = tEvent.getCreationCausalTA(); recoveryStartEventSeen = true; recoveredState = TaskAttemptState.RUNNING; this.containerId = tEvent.getContainerId(); @@ -770,6 +857,9 @@ public class TaskAttemptImpl implements TaskAttempt, : TaskAttemptTerminationCause.UNKNOWN_ERROR; this.diagnostics.add(tEvent.getDiagnostics()); this.recoveredState = tEvent.getState(); + if (tEvent.getDataEvents() != null) { + this.lastDataEvents.addAll(tEvent.getDataEvents()); + } sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState())); return recoveredState; } @@ -973,7 +1063,8 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent( attemptId, getVertex().getName(), launchTime, containerId, containerNodeId, - inProgressLogsUrl, completedLogsUrl, nodeHttpAddress); + inProgressLogsUrl, completedLogsUrl, nodeHttpAddress, creationTime, creationCausalTA, + allocationTime); this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), startEvt)); } @@ -985,7 +1076,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( attemptId, getVertex().getName(), getLaunchTime(), getFinishTime(), TaskAttemptState.SUCCEEDED, null, - "", getCounters()); + "", getCounters(), lastDataEvents); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); @@ -1002,7 +1093,7 @@ public class TaskAttemptImpl implements TaskAttempt, finishTime, state, terminationCause, StringUtils.join( - getDiagnostics(), LINE_SEPARATOR), getCounters()); + getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); @@ -1130,7 +1221,7 @@ public class TaskAttemptImpl implements TaskAttempt, .getTaskAttemptState()); // Send out events to the Task - indicating TaskAttemptTermination(F/K) ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper - .getTaskEventType())); + .getTaskEventType(), event)); } } @@ -1140,9 +1231,10 @@ public class TaskAttemptImpl implements TaskAttempt, public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) { TaskAttemptEventStartedRemotely event = (TaskAttemptEventStartedRemotely) origEvent; - Container container = ta.appContext.getAllContainers() - .get(event.getContainerId()).getContainer(); + AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId()); + Container container = amContainer.getContainer(); + ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); ta.container = container; ta.containerId = event.getContainerId(); ta.containerNodeId = container.getNodeId(); @@ -1268,12 +1360,16 @@ public class TaskAttemptImpl implements TaskAttempt, SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { @Override public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { - TaskStatusUpdateEvent statusEvent = ((TaskAttemptEventStatusUpdate) event) - .getStatusEvent(); + TaskAttemptEventStatusUpdate sEvent = (TaskAttemptEventStatusUpdate) event; + TaskStatusUpdateEvent statusEvent = sEvent.getStatusEvent(); ta.reportedStatus.state = ta.getState(); ta.reportedStatus.progress = statusEvent.getProgress(); ta.reportedStatus.counters = statusEvent.getCounters(); ta.statistics = statusEvent.getStatistics(); + if (sEvent.getReadErrorReported()) { + // if there is a read error then track the next last data event + ta.appendNextDataEvent = true; + } ta.updateProgressSplits(); @@ -1518,7 +1614,7 @@ public class TaskAttemptImpl implements TaskAttempt, new EventMetaData(EventProducerConsumerType.SYSTEM, vertex.getName(), edgeVertex.getName(), - getID()))); + getID()), appContext.getClock().getTime())); } sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents)); } @@ -1595,4 +1691,24 @@ public class TaskAttemptImpl implements TaskAttempt, public String toString() { return getID().toString(); } + + + @Override + public void setLastEventSent(TezEvent lastEventSent) { + writeLock.lock(); + try { + DataEventDependencyInfo info = new DataEventDependencyInfo( + lastEventSent.getEventReceivedTime(), lastEventSent.getSourceInfo().getTaskAttemptID()); + // task attempt id may be null for input data information events + if (appendNextDataEvent) { + appendNextDataEvent = false; + lastDataEvents.add(info); + } else { + // over-write last event - array list makes it quick + lastDataEvents.set(lastDataEvents.size() - 1, info); + } + } finally { + writeLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index ef8e33a..e6027f5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -70,6 +70,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; @@ -549,7 +550,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } private TaskAttempt createRecoveredTaskAttempt(TezTaskAttemptID tezTaskAttemptID) { - TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId()); + TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId(), null); return taskAttempt; } @@ -814,10 +815,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } } - TaskAttemptImpl createAttempt(int attemptNumber) { + TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) { return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, - (failedAttempts > 0), taskResource, containerContext, leafVertex, this); + (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA); } @Override @@ -834,8 +835,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } // This is always called in the Write Lock - private void addAndScheduleAttempt() { - TaskAttempt attempt = createAttempt(attempts.size()); + private void addAndScheduleAttempt(TezTaskAttemptID schedulingCausalTA) { + TaskAttempt attempt = createAttempt(attempts.size(), schedulingCausalTA); if (LOG.isDebugEnabled()) { LOG.debug("Created attempt " + attempt.getID()); } @@ -1048,7 +1049,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { TaskEventScheduleTask scheduleEvent = (TaskEventScheduleTask) event; task.locationHint = scheduleEvent.getTaskLocationHint(); task.baseTaskSpec = scheduleEvent.getBaseTaskSpec(); - task.addAndScheduleAttempt(); + // For now, initial scheduling dependency is due to vertex manager scheduling + task.addAndScheduleAttempt(null); task.scheduledTime = task.clock.getTime(); task.logJobHistoryTaskStartedEvent(); } @@ -1066,7 +1068,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { public void transition(TaskImpl task, TaskEvent event) { LOG.info("Scheduling a redundant attempt for task " + task.taskId); task.counters.findCounter(TaskCounter.NUM_SPECULATIONS).increment(1); - task.addAndScheduleAttempt(); + TezTaskAttemptID earliestUnfinishedAttempt = null; + for (TaskAttempt ta : task.attempts.values()) { + // find the oldest running attempt + if (!ta.isFinished()) { + earliestUnfinishedAttempt = ta.getID(); + } + } + task.addAndScheduleAttempt(earliestUnfinishedAttempt); } } @@ -1143,9 +1152,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // we KillWaitAttemptCompletedTransitionready have a spare task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true); task.getVertex().incrementKilledTaskAttemptCount(); - if (task.getUncompletedAttemptsCount() == 0 - && task.successfulAttempt == null) { - task.addAndScheduleAttempt(); + if (task.shouldScheduleNewAttempt()) { + task.addAndScheduleAttempt(castEvent.getTaskAttemptID()); } } } @@ -1255,7 +1263,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // If any incomplete, the running attempt will moved to failed and its // update will trigger a new attempt if possible if (task.attempts.size() == task.getFinishedAttemptsCount()) { - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(null); } endState = TaskStateInternal.RUNNING; break; @@ -1304,15 +1312,23 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { return task.getInternalState(); } } + + private boolean shouldScheduleNewAttempt() { + return (getUncompletedAttemptsCount() == 0 + && successfulAttempt == null); + } private static class AttemptFailedTransition implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> { + private TezTaskAttemptID schedulingCausalTA; + @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { task.failedAttempts++; task.getVertex().incrementFailedTaskAttemptCount(); TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event; + schedulingCausalTA = castEvent.getTaskAttemptID(); task.addDiagnosticInfo("TaskAttempt " + castEvent.getTaskAttemptID().getId() + " failed," + " info=" + task.getAttempt(castEvent.getTaskAttemptID()).getDiagnostics()); if (task.commitAttempt != null && @@ -1327,12 +1343,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { ((TaskEventTAUpdate) event).getTaskAttemptID(), TaskAttemptStateInternal.FAILED); // we don't need a new event if we already have a spare - if (task.getUncompletedAttemptsCount() == 0 - && task.successfulAttempt == null) { + if (task.shouldScheduleNewAttempt()) { LOG.info("Scheduling new attempt for task: " + task.getTaskId() + ", currentFailedAttempts: " + task.failedAttempts + ", maxFailedAttempts: " + task.maxFailedAttempts); - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(getSchedulingCausalTA()); } } else { LOG.info("Failing task: " + task.getTaskId() @@ -1352,11 +1367,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { protected TaskStateInternal getDefaultState(TaskImpl task) { return task.getInternalState(); } + + protected TezTaskAttemptID getSchedulingCausalTA() { + return schedulingCausalTA; + } } private static class TaskRetroactiveFailureTransition extends AttemptFailedTransition { + private TezTaskAttemptID schedulingCausalTA; + @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { if (task.leafVertex) { @@ -1386,6 +1407,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // succeeded state return TaskStateInternal.SUCCEEDED; } + + Preconditions.checkState(castEvent.getCausalEvent() != null); + TaskAttemptEventOutputFailed destinationEvent = + (TaskAttemptEventOutputFailed) castEvent.getCausalEvent(); + schedulingCausalTA = destinationEvent.getInputFailedEvent().getSourceInfo().getTaskAttemptID(); // super.transition is mostly coded for the case where an // UNcompleted task failed. When a COMPLETED task retroactively @@ -1402,6 +1428,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { return returnState; } + + @Override + protected TezTaskAttemptID getSchedulingCausalTA() { + return schedulingCausalTA; + } @Override protected TaskStateInternal getDefaultState(TaskImpl task) { @@ -1433,7 +1464,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // from the map splitInfo. So the bad node might be sent as a location // to the RM. But the RM would ignore that just like it would ignore // currently pending container requests affinitized to bad nodes. - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(attemptId); return TaskStateInternal.SCHEDULED; } else { // nothing to do http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 7cf12e3..95ee298 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -3446,6 +3446,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl vertex.distanceFromRoot = distanceFromRoot; } vertex.numStartedSourceVertices++; + vertex.startTimeRequested = vertex.clock.getTime(); LOG.info("Source vertex started: " + startEvent.getSourceVertexId() + " for vertex: " + vertex.logIdentifier + " numStartedSources: " + vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size()); @@ -3504,12 +3505,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl Preconditions.checkState( (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty()), "Vertex: " + vertex.logIdentifier + " got invalid start event"); - vertex.startTimeRequested = vertex.clock.getTime(); vertex.startSignalPending = true; + vertex.startTimeRequested = vertex.clock.getTime(); } } - + public static class StartTransition implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { @@ -3517,7 +3518,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl public VertexState transition(VertexImpl vertex, VertexEvent event) { Preconditions.checkState(vertex.getState() == VertexState.INITED, "Unexpected state " + vertex.getState() + " for " + vertex.logIdentifier); - vertex.startTimeRequested = vertex.clock.getTime(); + // if the start signal is pending this event is a fake start event to trigger this transition + if (!vertex.startSignalPending) { + vertex.startTimeRequested = vertex.clock.getTime(); + } return vertex.startVertex(); } } @@ -4083,7 +4087,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @Override public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID, int fromEventId, int preRoutedFromEventId, int maxEvents) { - ArrayList<TezEvent> events = getTask(attemptID.getTaskID()).getTaskAttemptTezEvents( + Task task = getTask(attemptID.getTaskID()); + ArrayList<TezEvent> events = task.getTaskAttemptTezEvents( attemptID, preRoutedFromEventId, maxEvents); int nextPreRoutedFromEventId = preRoutedFromEventId + events.size(); int nextFromEventId = fromEventId; @@ -4169,6 +4174,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } finally { onDemandRouteEventsReadLock.unlock(); } + if (!events.isEmpty()) { + for (int i=(events.size() - 1); i>=0; --i) { + TezEvent lastEvent = events.get(i); + // record the last event sent by the AM to the task + EventType lastEventType = lastEvent.getEventType(); + // if the following changes then critical path logic/recording may need revision + if (lastEventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT || + lastEventType == EventType.DATA_MOVEMENT_EVENT || + lastEventType == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) { + task.getAttempt(attemptID).setLastEventSent(lastEvent); + break; + } + } + } return new TaskAttemptEventInfo(nextFromEventId, events, nextPreRoutedFromEventId); } http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index 0cc6666..803159a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -231,11 +231,12 @@ public class VertexManager { Collection<InputDataInformationEvent> events) { checkAndThrowIfDone(); verifyIsRootInput(inputName); + final long currTime = appContext.getClock().getTime(); Collection<TezEvent> tezEvents = Collections2.transform(events, new Function<InputDataInformationEvent, TezEvent>() { @Override public TezEvent apply(InputDataInformationEvent riEvent) { - TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata); + TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata, currTime); tezEvent.setDestinationInfo(getDestinationMetaData(inputName)); return tezEvent; } http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java index a6b403d..7d6da8a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java @@ -32,5 +32,6 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{ public Container getContainer(); public List<TezTaskAttemptID> getAllTaskAttempts(); public TezTaskAttemptID getCurrentTaskAttempt(); + public long getCurrentTaskAttemptAllocationTime(); } http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 330f2b7..9b90752 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -95,6 +95,7 @@ public class AMContainerImpl implements AMContainer { private boolean nodeFailed = false; private TezTaskAttemptID currentAttempt; + private long currentAttemptAllocationTime; private List<TezTaskAttemptID> failedAssignments; private boolean inError = false; @@ -362,6 +363,16 @@ public class AMContainerImpl implements AMContainer { } } + @Override + public long getCurrentTaskAttemptAllocationTime() { + readLock.lock(); + try { + return this.currentAttemptAllocationTime; + } finally { + readLock.unlock(); + } + } + public boolean isInErrorState() { return inError; } @@ -532,6 +543,7 @@ public class AMContainerImpl implements AMContainer { // Register the additional resources back for this container. container.containerLocalResources.putAll(container.additionalLocalResources); container.currentAttempt = event.getTaskAttemptId(); + container.currentAttemptAllocationTime = container.appContext.getClock().getTime(); if (LOG.isDebugEnabled()) { LOG.debug("AssignTA: attempt: " + event.getRemoteTaskSpec()); LOG.debug("AdditionalLocalResources: " + container.additionalLocalResources); http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java index af529bf..fbde635 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java @@ -21,16 +21,22 @@ package org.apache.tez.dag.history.events; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto; public class TaskAttemptFinishedEvent implements HistoryEvent { @@ -45,14 +51,16 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { private String diagnostics; private TezCounters tezCounters; private TaskAttemptTerminationCause error; - + private List<DataEventDependencyInfo> dataEvents; + public TaskAttemptFinishedEvent(TezTaskAttemptID taId, String vertexName, long startTime, long finishTime, TaskAttemptState state, TaskAttemptTerminationCause error, - String diagnostics, TezCounters counters) { + String diagnostics, TezCounters counters, + List<DataEventDependencyInfo> dataEvents) { this.taskAttemptId = taId; this.vertexName = vertexName; this.startTime = startTime; @@ -61,6 +69,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.diagnostics = diagnostics; this.tezCounters = counters; this.error = error; + this.dataEvents = dataEvents; } public TaskAttemptFinishedEvent() { @@ -80,7 +89,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { public boolean isHistoryEvent() { return true; } - + + public List<DataEventDependencyInfo> getDataEvents() { + return dataEvents; + } + public TaskAttemptFinishedProto toProto() { TaskAttemptFinishedProto.Builder builder = TaskAttemptFinishedProto.newBuilder(); @@ -96,6 +109,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { if (tezCounters != null) { builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters)); } + if (dataEvents != null && !dataEvents.isEmpty()) { + for (DataEventDependencyInfo info : dataEvents) { + builder.addDataEvents(DataEventDependencyInfo.toProto(info)); + } + } return builder.build(); } @@ -113,6 +131,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.tezCounters = DagTypeConverters.convertTezCountersFromProto( proto.getCounters()); } + if (proto.getDataEventsCount() > 0) { + this.dataEvents = Lists.newArrayListWithCapacity(proto.getDataEventsCount()); + for (DataEventDependencyInfoProto protoEvent : proto.getDataEventsList()) { + this.dataEvents.add(DataEventDependencyInfo.fromProto(protoEvent)); + } + } } @Override @@ -140,6 +164,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { + ", status=" + state.name() + ", errorEnum=" + (error != null ? error.name() : "") + ", diagnostics=" + diagnostics + + ", lastDataEventSourceTA=" + + ((dataEvents==null) ? 0:dataEvents.size()) + ", counters=" + (tezCounters == null ? "null" : tezCounters.toString() .replaceAll("\\n", ", ").replaceAll("\\s+", " ")); http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java index 36add86..4d15fb9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java @@ -36,24 +36,30 @@ public class TaskAttemptStartedEvent implements HistoryEvent { private String inProgressLogsUrl; private String completedLogsUrl; private String vertexName; - private long startTime; + private long launchTime; private ContainerId containerId; private NodeId nodeId; private String nodeHttpAddress; + private TezTaskAttemptID creationCausalTA; + private long creationTime; + private long allocationTime; public TaskAttemptStartedEvent(TezTaskAttemptID taId, - String vertexName, long startTime, + String vertexName, long launchTime, ContainerId containerId, NodeId nodeId, String inProgressLogsUrl, String completedLogsUrl, - String nodeHttpAddress) { + String nodeHttpAddress, long creationTime, TezTaskAttemptID creationCausalTA, long allocationTime) { this.taskAttemptId = taId; this.vertexName = vertexName; - this.startTime = startTime; + this.launchTime = launchTime; this.containerId = containerId; this.nodeId = nodeId; this.inProgressLogsUrl = inProgressLogsUrl; this.completedLogsUrl = completedLogsUrl; this.nodeHttpAddress = nodeHttpAddress; + this.creationTime = creationTime; + this.creationCausalTA = creationCausalTA; + this.allocationTime = allocationTime; } public TaskAttemptStartedEvent() { @@ -75,19 +81,29 @@ public class TaskAttemptStartedEvent implements HistoryEvent { } public TaskAttemptStartedProto toProto() { - return TaskAttemptStartedProto.newBuilder() - .setTaskAttemptId(taskAttemptId.toString()) - .setStartTime(startTime) + TaskAttemptStartedProto.Builder builder = TaskAttemptStartedProto.newBuilder(); + builder.setTaskAttemptId(taskAttemptId.toString()) + .setStartTime(launchTime) .setContainerId(containerId.toString()) .setNodeId(nodeId.toString()) - .build(); + .setCreationTime(creationTime) + .setAllocationTime(allocationTime); + if (creationCausalTA != null) { + builder.setCreationCausalTA(creationCausalTA.toString()); + } + return builder.build(); } public void fromProto(TaskAttemptStartedProto proto) { this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId()); - this.startTime = proto.getStartTime(); + this.launchTime = proto.getStartTime(); this.containerId = ConverterUtils.toContainerId(proto.getContainerId()); this.nodeId = ConverterUtils.toNodeId(proto.getNodeId()); + this.creationTime = proto.getCreationTime(); + this.allocationTime = proto.getAllocationTime(); + if (proto.hasCreationCausalTA()) { + this.creationCausalTA = TezTaskAttemptID.fromString(proto.getCreationCausalTA()); + } } @Override @@ -108,7 +124,9 @@ public class TaskAttemptStartedEvent implements HistoryEvent { public String toString() { return "vertexName=" + vertexName + ", taskAttemptId=" + taskAttemptId - + ", startTime=" + startTime + + ", creationTime=" + creationTime + + ", allocationTime=" + allocationTime + + ", startTime=" + launchTime + ", containerId=" + containerId + ", nodeId=" + nodeId + ", inProgressLogs=" + inProgressLogsUrl @@ -120,7 +138,19 @@ public class TaskAttemptStartedEvent implements HistoryEvent { } public long getStartTime() { - return startTime; + return launchTime; + } + + public long getCreationTime() { + return creationTime; + } + + public long getAllocationTime() { + return allocationTime; + } + + public TezTaskAttemptID getCreationCausalTA() { + return creationCausalTA; } public ContainerId getContainerId() { http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java index 0310a26..6f44f33 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java @@ -145,6 +145,7 @@ public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent { if (event.getDestinationInfo() != null) { evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo())); } + evtBuilder.setEventTime(event.getEventReceivedTime()); tezEventProtos.add(evtBuilder.build()); } } @@ -184,7 +185,7 @@ public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent { if (eventProto.hasDestinationInfo()) { destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo()); } - TezEvent tezEvent = new TezEvent(evt, sourceInfo); + TezEvent tezEvent = new TezEvent(evt, sourceInfo, eventProto.getEventTime()); tezEvent.setDestinationInfo(destinationInfo); this.events.add(tezEvent); } http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 07ce2f3..411d677 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -530,6 +530,10 @@ public class HistoryEventJsonConversion { otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); otherInfo.put(ATSConstants.COUNTERS, DAGUtils.convertCountersToJSON(event.getCounters())); + if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) { + otherInfo.put(ATSConstants.LAST_DATA_EVENTS, + DAGUtils.convertDataEventDependencyInfoToJSON(event.getDataEvents())); + } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); return jsonObject; @@ -573,6 +577,11 @@ public class HistoryEventJsonConversion { JSONObject otherInfo = new JSONObject(); otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + otherInfo.put(ATSConstants.CREATION_TIME, event.getCreationTime()); + otherInfo.put(ATSConstants.ALLOCATION_TIME, event.getAllocationTime()); + if (event.getCreationCausalTA() != null) { + otherInfo.put(ATSConstants.CREATION_CAUSAL_ATTEMPT, event.getCreationCausalTA().toString()); + } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); return jsonObject; http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java index 1447832..76e592e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; @@ -42,6 +43,8 @@ import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo; import org.apache.tez.dag.app.dag.impl.VertexStats; +import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; +import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.dag.records.TezTaskID; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -99,6 +102,27 @@ public class DAGUtils { } return dagJson; } + + public static JSONObject convertDataEventDependencyInfoToJSON(List<DataEventDependencyInfo> info) { + return new JSONObject(convertDataEventDependecyInfoToATS(info)); + } + + public static Map<String, Object> convertDataEventDependecyInfoToATS(List<DataEventDependencyInfo> info) { + ArrayList<Object> infoList = new ArrayList<Object>(); + for (DataEventDependencyInfo event : info) { + Map<String, Object> eventObj = new LinkedHashMap<String, Object>(); + String id = ""; + if (event.getTaskAttemptId() != null) { + id = event.getTaskAttemptId().toString(); + } + eventObj.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), id); + eventObj.put(ATSConstants.TIMESTAMP, event.getTimestamp()); + infoList.add(eventObj); + } + Map<String,Object> object = new LinkedHashMap<String, Object>(); + putInto(object, ATSConstants.LAST_DATA_EVENTS, infoList); + return object; + } public static JSONObject convertCountersToJSON(TezCounters counters) throws JSONException { http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/proto/HistoryEvents.proto ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto index 8af48b6..232f1b7 100644 --- a/tez-dag/src/main/proto/HistoryEvents.proto +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -164,6 +164,14 @@ message TaskAttemptStartedProto { optional int64 start_time = 2; optional string container_id = 3; optional string node_id = 4; + optional int64 creation_time = 5; + optional string creation_causal_t_a = 6; + optional int64 allocation_time = 7; +} + +message DataEventDependencyInfoProto { + optional string task_attempt_id = 1; + optional int64 timestamp = 2; } message TaskAttemptFinishedProto { @@ -173,6 +181,7 @@ message TaskAttemptFinishedProto { optional string diagnostics = 4; optional TezCountersProto counters = 5; optional string error_enum = 6; + repeated DataEventDependencyInfoProto data_events = 7; } message EventMetaDataProto { @@ -189,6 +198,7 @@ message TezDataMovementEventProto { optional CompositeEventProto composite_data_movement_event = 4; optional RootInputDataInformationEventProto root_input_data_information_event = 5; optional RootInputInitializerEventProto input_initializer_event = 6; + optional int64 event_time = 7; } message VertexDataMovementEventsGeneratedProto { http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 08f6ff6..8fa57d3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -114,7 +114,7 @@ public class MockDAGAppMaster extends DAGAppMaster { } public static interface EventsDelegate { - public void getEvents(TaskSpec taskSpec, List<TezEvent> events); + public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time); } // mock container launcher does not launch real tasks. @@ -408,7 +408,7 @@ public class MockDAGAppMaster extends DAGAppMaster { List<TezEvent> events = Lists.newArrayListWithCapacity( cData.taskSpec.getOutputs().size() + 1); if (cData.numUpdates == 0 && eventsDelegate != null) { - eventsDelegate.getEvents(cData.taskSpec, events); + eventsDelegate.getEvents(cData.taskSpec, events, getContext().getClock().getTime()); } TezCounters counters = null; if (countersDelegate != null) { @@ -422,7 +422,8 @@ public class MockDAGAppMaster extends DAGAppMaster { float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1; float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f; events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData( - EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId))); + EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId), + getContext().getClock().getTime())); TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events, cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000); doHeartbeat(request, cData); @@ -433,7 +434,8 @@ public class MockDAGAppMaster extends DAGAppMaster { cData.completed = true; List<TezEvent> events = Collections.singletonList(new TezEvent( new TaskAttemptCompletedEvent(), new EventMetaData( - EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId))); + EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId), + getContext().getClock().getTime())); TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events, cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000); doHeartbeat(request, cData); http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index 4137d42..42d4b0b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -126,17 +126,17 @@ public class TestMockDAGAppMaster { static class TestEventsDelegate implements EventsDelegate { @Override - public void getEvents(TaskSpec taskSpec, List<TezEvent> events) { + public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time) { for (OutputSpec output : taskSpec.getOutputs()) { if (output.getPhysicalEdgeCount() == 1) { events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, null), new EventMetaData( EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output - .getDestinationVertexName(), taskSpec.getTaskAttemptID()))); + .getDestinationVertexName(), taskSpec.getTaskAttemptID()), time)); } else { events.add(new TezEvent(CompositeDataMovementEvent.create(0, output.getPhysicalEdgeCount(), null), new EventMetaData( EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output - .getDestinationVertexName(), taskSpec.getTaskAttemptID()))); + .getDestinationVertexName(), taskSpec.getTaskAttemptID()), time)); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index 5c24ecc..641f3a2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -47,6 +47,7 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.dag.event.VertexEventType; @@ -58,6 +59,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventType; @@ -99,12 +101,15 @@ public class TestTaskAttemptListenerImplTezDag { eventHandler = mock(EventHandler.class); + MockClock clock = new MockClock(); + appContext = mock(AppContext.class); doReturn(eventHandler).when(appContext).getEventHandler(); doReturn(dag).when(appContext).getCurrentDAG(); doReturn(appAcls).when(appContext).getApplicationACLs(); doReturn(amContainerMap).when(appContext).getAllContainers(); - + doReturn(clock).when(appContext).getClock(); + taskAttemptListener = new TaskAttemptListenerImplForTest(appContext, mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null); @@ -198,6 +203,7 @@ public class TestTaskAttemptListenerImplTezDag { final Event statusUpdateEvent = argAllValues.get(0); assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE, statusUpdateEvent.getType()); + assertEquals(false, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported()); final Event vertexEvent = argAllValues.get(1); final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent; @@ -207,9 +213,39 @@ public class TestTaskAttemptListenerImplTezDag { vertexRouteEvent.getEvents().get(0).getEventType()); assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT, vertexRouteEvent.getEvents().get(1).getEventType()); + } + + @Test (timeout = 5000) + public void testTaskEventRoutingWithReadError() throws Exception { + List<TezEvent> events = Arrays.asList( + new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null), + new TezEvent(InputReadErrorEvent.create("", 0, 0), null), + new TezEvent(new TaskAttemptCompletedEvent(), null) + ); + + generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>()); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(arg.capture()); + final List<Event> argAllValues = arg.getAllValues(); + + final Event statusUpdateEvent = argAllValues.get(0); + assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE, + statusUpdateEvent.getType()); + assertEquals(true, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported()); + + final Event vertexEvent = argAllValues.get(1); + final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent; + assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT, + vertexEvent.getType()); + assertEquals(EventType.INPUT_READ_ERROR_EVENT, + vertexRouteEvent.getEvents().get(0).getEventType()); + assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT, + vertexRouteEvent.getEvents().get(1).getEventType()); } + @Test (timeout = 5000) public void testTaskEventRoutingTaskAttemptOnly() throws Exception { List<TezEvent> events = Arrays.asList( http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 5d05fa3..1a1cb11 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -587,6 +588,94 @@ public class TestTaskAttempt { } @Test(timeout = 5000) + public void testLastDataEventRecording() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 0); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + + MockEventHandler eventHandler = spy(new MockEventHandler()); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + Configuration taskConf = new Configuration(); + taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + taskConf.setBoolean("fs.file.impl.disable.cache", true); + taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true); + + locationHint = TaskLocationHint.createTaskLocationHint( + new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); + Resource resource = Resource.newInstance(1024, 1); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + new ContainerContextMatcher(), appCtx); + containers.addContainerIfNew(container); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, taskConf, new SystemClock(), + mockHeartbeatHandler, appCtx, false, + resource, createFakeContainerContext(), false); + TezTaskAttemptID taskAttemptID = taImpl.getID(); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); + // At state STARTING. + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, + null)); + assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), + TaskAttemptState.RUNNING); + + long ts1 = 1024; + long ts2 = 2048; + TezTaskAttemptID mockId1 = mock(TezTaskAttemptID.class); + TezTaskAttemptID mockId2 = mock(TezTaskAttemptID.class); + TezEvent mockTezEvent1 = mock(TezEvent.class, RETURNS_DEEP_STUBS); + when(mockTezEvent1.getEventReceivedTime()).thenReturn(ts1); + when(mockTezEvent1.getSourceInfo().getTaskAttemptID()).thenReturn(mockId1); + TezEvent mockTezEvent2 = mock(TezEvent.class, RETURNS_DEEP_STUBS); + when(mockTezEvent2.getEventReceivedTime()).thenReturn(ts2); + when(mockTezEvent2.getSourceInfo().getTaskAttemptID()).thenReturn(mockId2); + TaskAttemptEventStatusUpdate statusEvent = + new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null)); + + assertEquals(0, taImpl.lastDataEvents.size()); + taImpl.setLastEventSent(mockTezEvent1); + assertEquals(1, taImpl.lastDataEvents.size()); + assertEquals(ts1, taImpl.lastDataEvents.get(0).getTimestamp()); + assertEquals(mockId1, taImpl.lastDataEvents.get(0).getTaskAttemptId()); + taImpl.handle(statusEvent); + taImpl.setLastEventSent(mockTezEvent2); + assertEquals(1, taImpl.lastDataEvents.size()); + assertEquals(ts2, taImpl.lastDataEvents.get(0).getTimestamp()); + assertEquals(mockId2, taImpl.lastDataEvents.get(0).getTaskAttemptId()); // over-write earlier value + statusEvent.setReadErrorReported(true); + taImpl.handle(statusEvent); + taImpl.setLastEventSent(mockTezEvent1); + assertEquals(2, taImpl.lastDataEvents.size()); + assertEquals(ts1, taImpl.lastDataEvents.get(1).getTimestamp()); + assertEquals(mockId1, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // add new event + taImpl.setLastEventSent(mockTezEvent2); + assertEquals(2, taImpl.lastDataEvents.size()); + assertEquals(ts2, taImpl.lastDataEvents.get(1).getTimestamp()); + assertEquals(mockId2, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // over-write earlier value + } + + @Test(timeout = 5000) public void testFailure() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java index 0665b1e..90274eb 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java @@ -19,7 +19,6 @@ package org.apache.tez.dag.app.dag.impl; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -52,6 +51,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; +import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.history.HistoryEventType; @@ -66,12 +66,16 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import com.google.common.collect.Lists; + @SuppressWarnings({ "unchecked", "rawtypes" }) public class TestTaskAttemptRecovery { private TaskAttemptImpl ta; private EventHandler mockEventHandler; - private long startTime = System.currentTimeMillis(); + private long creationTime = System.currentTimeMillis(); + private long allocationTime = creationTime + 5000; + private long startTime = allocationTime + 5000; private long finishTime = startTime + 5000; private TezTaskAttemptID taId; @@ -153,9 +157,14 @@ public class TestTaskAttemptRecovery { } private void restoreFromTAStartEvent() { + TezTaskAttemptID causalId = TezTaskAttemptID.getInstance(taId.getTaskID(), taId.getId()+1); TaskAttemptState recoveredState = ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); + startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", creationTime, causalId, + allocationTime)); + assertEquals(causalId, ta.getCreationCausalAttempt()); + assertEquals(creationTime, ta.getCreationTime()); + assertEquals(allocationTime, ta.getAllocationTime()); assertEquals(startTime, ta.getLaunchTime()); assertEquals(TaskAttemptState.RUNNING, recoveredState); } @@ -169,9 +178,14 @@ public class TestTaskAttemptRecovery { errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR; } + long lastDataEventTime = 1024; + TezTaskAttemptID lastDataEventTA = mock(TezTaskAttemptID.class); + List<DataEventDependencyInfo> events = Lists.newLinkedList(); + events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA)); + events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA)); TaskAttemptState recoveredState = ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - startTime, finishTime, state, errorEnum, diag, counters)); + startTime, finishTime, state, errorEnum, diag, counters, events)); assertEquals(startTime, ta.getLaunchTime()); assertEquals(finishTime, ta.getFinishTime()); assertEquals(counters, ta.reportedStatus.counters); @@ -180,6 +194,9 @@ public class TestTaskAttemptRecovery { assertEquals(1, ta.getDiagnostics().size()); assertEquals(diag, ta.getDiagnostics().get(0)); assertEquals(state, recoveredState); + assertEquals(events.size(), ta.lastDataEvents.size()); + assertEquals(lastDataEventTime, ta.lastDataEvents.get(0).getTimestamp()); + assertEquals(lastDataEventTA, ta.lastDataEvents.get(0).getTaskAttemptId()); if (state != TaskAttemptState.SUCCEEDED) { assertEquals(errorEnum, ta.getTerminationCause()); } else { @@ -304,7 +321,7 @@ public class TestTaskAttemptRecovery { TaskAttemptState recoveredState = ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, startTime, finishTime, TaskAttemptState.KILLED, - TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters())); + TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), null)); assertEquals(TaskAttemptState.KILLED, recoveredState); } } http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 87dd2fa..1ee6c4e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -56,6 +56,7 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.TaskStateInternal; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; import org.apache.tez.dag.app.dag.event.TaskEventTermination; @@ -72,6 +73,7 @@ import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -359,7 +361,10 @@ public class TestTaskImpl { LOG.info("--- START: testKillScheduledTaskAttempt ---"); TezTaskID taskId = getNewTaskID(); scheduleTaskAttempt(taskId); + TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID(); killScheduledTaskAttempt(mockTask.getLastAttempt().getID()); + // last killed attempt should be causal TA of next attempt + Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA()); } @Test(timeout = 5000) @@ -383,8 +388,11 @@ public class TestTaskImpl { LOG.info("--- START: testKillRunningTaskAttempt ---"); TezTaskID taskId = getNewTaskID(); scheduleTaskAttempt(taskId); + TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID(); launchTaskAttempt(mockTask.getLastAttempt().getID()); killRunningTaskAttempt(mockTask.getLastAttempt().getID()); + // last killed attempt should be causal TA of next attempt + Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA()); } /** @@ -505,11 +513,15 @@ public class TestTaskImpl { // During the task attempt commit there is an exception which causes // the attempt to fail + TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID(); updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED); + assertEquals(1, mockTask.getAttemptList().size()); failRunningTaskAttempt(mockTask.getLastAttempt().getID()); assertEquals(2, mockTask.getAttemptList().size()); assertEquals(1, mockTask.failedAttempts); + // last failed attempt should be the causal TA + Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA()); assertFalse("First attempt should not commit", mockTask.canCommit(mockTask.getAttemptList().get(0).getID())); @@ -553,6 +565,7 @@ public class TestTaskImpl { scheduleTaskAttempt(taskId); launchTaskAttempt(mockTask.getLastAttempt().getID()); updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); + TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID(); // Add a speculative task attempt that succeeds mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), @@ -560,6 +573,11 @@ public class TestTaskImpl { launchTaskAttempt(mockTask.getLastAttempt().getID()); updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); + assertEquals(2, mockTask.getAttemptList().size()); + + // previous running attempt should be the casual TA of this speculative attempt + Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA()); + assertTrue("Second attempt should commit", mockTask.canCommit(mockTask.getAttemptList().get(1).getID())); assertFalse("First attempt should not commit", @@ -602,8 +620,14 @@ public class TestTaskImpl { eventHandler.events.clear(); // Now fail the attempt after it has succeeded + TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class); + TezEvent mockTezEvent = mock(TezEvent.class); + EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId); + when(mockTezEvent.getSourceInfo()).thenReturn(meta); + TaskAttemptEventOutputFailed outputFailedEvent = + new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1); mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt() - .getID(), TaskEventType.T_ATTEMPT_FAILED)); + .getID(), TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent)); // The task should still be in the scheduled state assertTaskScheduledState(); @@ -611,6 +635,12 @@ public class TestTaskImpl { Assert.assertEquals(AMNodeEventType.N_TA_ENDED, event.getType()); event = eventHandler.events.get(eventHandler.events.size()-1); Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType()); + + // report of output read error should be the causal TA + List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList(); + Assert.assertEquals(2, attempts.size()); + MockTaskAttemptImpl newAttempt = attempts.get(1); + Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA()); } @Test(timeout = 5000) @@ -695,11 +725,11 @@ public class TestTaskImpl { } @Override - protected TaskAttemptImpl createAttempt(int attemptNumber) { + protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) { MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, - true, taskResource, containerContext); + true, taskResource, containerContext, schedCausalTA); taskAttempts.add(attempt); return attempt; } @@ -746,9 +776,10 @@ public class TestTaskImpl { EventHandler eventHandler, TaskAttemptListener tal, Configuration conf, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, boolean isRescheduled, - Resource resource, ContainerContext containerContext) { + Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) { super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh, - appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class)); + appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class), + schedCausalTA); } @Override
