Repository: tez Updated Branches: refs/heads/master 73da831e8 -> 6a99798f2
TEZ-2647. Add input causality dependency for attempts (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6a99798f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6a99798f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6a99798f Branch: refs/heads/master Commit: 6a99798f250310f8bd819aae89123e6146364983 Parents: 73da831 Author: Bikas Saha <[email protected]> Authored: Tue Aug 4 10:42:40 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Tue Aug 4 10:43:09 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/ATSConstants.java | 2 ++ .../dag/app/TaskAttemptListenerImpTezDag.java | 5 ++- .../org/apache/tez/dag/app/dag/TaskAttempt.java | 3 ++ .../org/apache/tez/dag/app/dag/impl/Edge.java | 13 +++++--- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 21 ++++++++++-- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 17 +++++++++- .../tez/dag/app/dag/impl/VertexManager.java | 3 +- .../events/TaskAttemptFinishedEvent.java | 27 +++++++++++++++- .../VertexRecoverableEventsGeneratedEvent.java | 3 +- .../impl/HistoryEventJsonConversion.java | 4 +++ tez-dag/src/main/proto/HistoryEvents.proto | 3 ++ .../apache/tez/dag/app/MockDAGAppMaster.java | 10 +++--- .../tez/dag/app/TestMockDAGAppMaster.java | 6 ++-- .../app/TestTaskAttemptListenerImplTezDag.java | 5 ++- .../app/dag/impl/TestTaskAttemptRecovery.java | 8 +++-- .../tez/dag/app/dag/impl/TestTaskRecovery.java | 34 ++++++++++---------- .../tez/dag/app/dag/impl/TestVertexImpl.java | 6 +++- .../TestHistoryEventsProtoConversion.java | 13 ++++++-- .../impl/TestHistoryEventJsonConversion.java | 2 +- .../parser/datamodel/TaskAttemptInfo.java | 15 +++++++++ .../apache/tez/history/TestATSFileParser.java | 24 +++++++++++++- .../ats/HistoryEventTimelineConversion.java | 6 +++- .../ats/TestHistoryEventTimelineConversion.java | 9 ++++-- .../apache/tez/runtime/api/impl/TezEvent.java | 17 ++++++++++ 25 files changed, 207 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 59307b7..1f85954 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce merge overhead. TEZ-2468. Change the minimum Java version to Java 7. TEZ-2646. Add scheduling casual dependency for attempts + TEZ-2647. Add input causality dependency for attempts ALL CHANGES: TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation. http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index 4bf9f6d..1568b96 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -81,6 +81,8 @@ public class ATSConstants { public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL"; public static final String COMPLETED_LOGS_URL = "completedLogsURL"; public static final String EXIT_STATUS = "exitStatus"; + public static final String LAST_DATA_EVENT_TIME = "lastDataEventTime"; + public static final String LAST_DATA_EVENT_SOURCE_TA = "lastDataEventSourceTA"; public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers"; public static final String SCHEDULING_CAUSAL_ATTEMPT = "schedulingCausalAttempt"; http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index fe92f3a..b7896c1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap; import org.apache.commons.collections4.ListUtils; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventType; import org.slf4j.Logger; @@ -424,6 +423,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements " events: " + (inEvents != null? inEvents.size() : -1)); } + long currTime = context.getClock().getTime(); List<TezEvent> otherEvents = new ArrayList<TezEvent>(); // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT) @@ -431,6 +431,9 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { + // for now, set the event time on the AM when it is received. + // this avoids any time disparity between machines. + tezEvent.setEventReceivedTime(currTime); final EventType eventType = tezEvent.getEventType(); if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) { TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java index 6c85cc2..4360cc3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java @@ -34,6 +34,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.impl.TezEvent; /** * Read only view of TaskAttempt. @@ -79,6 +80,8 @@ public interface TaskAttempt { float getProgress(); TaskAttemptState getState(); TaskAttemptState getStateNoLock(); + + void setLastEventSent(TezEvent lastEventSent); /** * Has attempt reached the final state or not. http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java index ddccf8d..da74a46 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java @@ -377,7 +377,7 @@ public class Edge { EventMetaData srcInfo = tezEvent.getSourceInfo(); for (DataMovementEvent dmEvent : compEvent.getEvents()) { - TezEvent newEvent = new TezEvent(dmEvent, srcInfo); + TezEvent newEvent = new TezEvent(dmEvent, srcInfo, tezEvent.getEventReceivedTime()); sendTezEventToDestinationTasks(newEvent); } } @@ -406,7 +406,7 @@ public class Edge { InputFailedEvent ifEvent = ((InputFailedEvent) event); e = InputFailedEvent.create(inputIndex, ifEvent.getVersion()); } - tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo()); + tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime()); tezEventToSend.setDestinationInfo(destinationMetaInfo); // cache the event object per input because are unique per input index inputIndicesWithEvents.put(inputIndex, tezEventToSend); @@ -553,7 +553,8 @@ public class Edge { DataMovementEvent e = compEvent.expand(sourceIndices[numEventsDone], targetIndices[numEventsDone]); numEventsDone++; - TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo()); + TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), + tezEvent.getEventReceivedTime()); tezEventToSend.setDestinationInfo(destinationMetaInfo); listToAdd.add(tezEventToSend); } @@ -585,7 +586,8 @@ public class Edge { while (numEventsDone < numEvents && listSize++ < listMaxSize) { InputFailedEvent e = ifEvent.makeCopy(targetIndices[numEventsDone]); numEventsDone++; - TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo()); + TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), + tezEvent.getEventReceivedTime()); tezEventToSend.setDestinationInfo(destinationMetaInfo); listToAdd.add(tezEventToSend); } @@ -617,7 +619,8 @@ public class Edge { while (numEventsDone < numEvents && listSize++ < listMaxSize) { DataMovementEvent e = dmEvent.makeCopy(targetIndices[numEventsDone]); numEventsDone++; - TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo()); + TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), + tezEvent.getEventReceivedTime()); tezEventToSend.setDestinationInfo(destinationMetaInfo); listToAdd.add(tezEventToSend); } http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 40636dd..ebf7c58 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -147,6 +147,10 @@ public class TaskAttemptImpl implements TaskAttempt, private final Vertex vertex; @VisibleForTesting + long lastDataEventTime; + TezTaskAttemptID lastDataEventSourceTA = null; + + @VisibleForTesting TaskAttemptStatus reportedStatus; private DAGCounter localityCounter; @@ -754,6 +758,8 @@ public class TaskAttemptImpl implements TaskAttempt, : TaskAttemptTerminationCause.UNKNOWN_ERROR; this.diagnostics.add(tEvent.getDiagnostics()); this.recoveredState = tEvent.getState(); + this.lastDataEventTime = tEvent.getLastDataEventTime(); + this.lastDataEventSourceTA = tEvent.getLastDataEventSourceTA(); sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState())); return recoveredState; } @@ -969,7 +975,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( attemptId, getVertex().getName(), getLaunchTime(), getFinishTime(), TaskAttemptState.SUCCEEDED, null, - "", getCounters()); + "", getCounters(), lastDataEventTime, lastDataEventSourceTA); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); @@ -982,7 +988,8 @@ public class TaskAttemptImpl implements TaskAttempt, clock.getTime(), state, terminationCause, StringUtils.join( - getDiagnostics(), LINE_SEPARATOR), getCounters()); + getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEventTime, + lastDataEventSourceTA); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); @@ -1485,7 +1492,7 @@ public class TaskAttemptImpl implements TaskAttempt, new EventMetaData(EventProducerConsumerType.SYSTEM, vertex.getName(), edgeVertex.getName(), - getID()))); + getID()), appContext.getClock().getTime())); } sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents)); } @@ -1562,4 +1569,12 @@ public class TaskAttemptImpl implements TaskAttempt, public String toString() { return getID().toString(); } + + + @Override + public void setLastEventSent(TezEvent lastEventSent) { + // task attempt id may be null for input data information events + this.lastDataEventSourceTA = lastEventSent.getSourceInfo().getTaskAttemptID(); + this.lastDataEventTime = lastEventSent.getEventReceivedTime(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 3888c7a..9519fa9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -4092,7 +4092,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @Override public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID, int fromEventId, int preRoutedFromEventId, int maxEvents) { - ArrayList<TezEvent> events = getTask(attemptID.getTaskID()).getTaskAttemptTezEvents( + Task task = getTask(attemptID.getTaskID()); + ArrayList<TezEvent> events = task.getTaskAttemptTezEvents( attemptID, preRoutedFromEventId, maxEvents); int nextPreRoutedFromEventId = preRoutedFromEventId + events.size(); int nextFromEventId = fromEventId; @@ -4192,6 +4193,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } finally { onDemandRouteEventsReadLock.unlock(); } + if (!events.isEmpty()) { + for (int i=(events.size() - 1); i>=0; --i) { + TezEvent lastEvent = events.get(i); + // record the last event sent by the AM to the task + EventType lastEventType = lastEvent.getEventType(); + // if the following changes then critical path logic/recording may need revision + if (lastEventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT || + lastEventType == EventType.DATA_MOVEMENT_EVENT || + lastEventType == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) { + task.getAttempt(attemptID).setLastEventSent(lastEvent); + break; + } + } + } return new TaskAttemptEventInfo(nextFromEventId, events, nextPreRoutedFromEventId); } http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index caa3432..64eb80f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -230,11 +230,12 @@ public class VertexManager { Collection<InputDataInformationEvent> events) { checkAndThrowIfDone(); verifyIsRootInput(inputName); + final long currTime = appContext.getClock().getTime(); Collection<TezEvent> tezEvents = Collections2.transform(events, new Function<InputDataInformationEvent, TezEvent>() { @Override public TezEvent apply(InputDataInformationEvent riEvent) { - TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata); + TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata, currTime); tezEvent.setDestinationInfo(getDestinationMetaData(inputName)); return tezEvent; } http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java index af529bf..52761e2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java @@ -45,6 +45,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { private String diagnostics; private TezCounters tezCounters; private TaskAttemptTerminationCause error; + private TezTaskAttemptID lastDataEventSourceTA; + private long lastDataEventTime; public TaskAttemptFinishedEvent(TezTaskAttemptID taId, String vertexName, @@ -52,7 +54,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { long finishTime, TaskAttemptState state, TaskAttemptTerminationCause error, - String diagnostics, TezCounters counters) { + String diagnostics, TezCounters counters, + long lastDataEventTime, + TezTaskAttemptID lastDataEventSourceTA) { this.taskAttemptId = taId; this.vertexName = vertexName; this.startTime = startTime; @@ -61,6 +65,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.diagnostics = diagnostics; this.tezCounters = counters; this.error = error; + this.lastDataEventTime = lastDataEventTime; + this.lastDataEventSourceTA = lastDataEventSourceTA; } public TaskAttemptFinishedEvent() { @@ -80,6 +86,14 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { public boolean isHistoryEvent() { return true; } + + public long getLastDataEventTime() { + return lastDataEventTime; + } + + public TezTaskAttemptID getLastDataEventSourceTA() { + return lastDataEventSourceTA; + } public TaskAttemptFinishedProto toProto() { TaskAttemptFinishedProto.Builder builder = @@ -96,6 +110,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { if (tezCounters != null) { builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters)); } + if (lastDataEventSourceTA != null) { + builder.setLastDataEventSourceTA(lastDataEventSourceTA.toString()); + builder.setLastDataEventTime(lastDataEventTime); + } return builder.build(); } @@ -113,6 +131,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.tezCounters = DagTypeConverters.convertTezCountersFromProto( proto.getCounters()); } + if (proto.hasLastDataEventSourceTA()) { + this.lastDataEventSourceTA = TezTaskAttemptID.fromString(proto.getLastDataEventSourceTA()); + this.lastDataEventTime = proto.getLastDataEventTime(); + } } @Override @@ -140,6 +162,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { + ", status=" + state.name() + ", errorEnum=" + (error != null ? error.name() : "") + ", diagnostics=" + diagnostics + + ", lastDataEventSourceTA=" + + ((lastDataEventSourceTA==null) ? null:lastDataEventSourceTA.toString()) + + ", lastDataEventTime=" + lastDataEventTime + ", counters=" + (tezCounters == null ? "null" : tezCounters.toString() .replaceAll("\\n", ", ").replaceAll("\\s+", " ")); http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java index 0310a26..6f44f33 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java @@ -145,6 +145,7 @@ public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent { if (event.getDestinationInfo() != null) { evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo())); } + evtBuilder.setEventTime(event.getEventReceivedTime()); tezEventProtos.add(evtBuilder.build()); } } @@ -184,7 +185,7 @@ public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent { if (eventProto.hasDestinationInfo()) { destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo()); } - TezEvent tezEvent = new TezEvent(evt, sourceInfo); + TezEvent tezEvent = new TezEvent(evt, sourceInfo, eventProto.getEventTime()); tezEvent.setDestinationInfo(destinationInfo); this.events.add(tezEvent); } http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 3fdfe0a..528da10 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -530,6 +530,10 @@ public class HistoryEventJsonConversion { otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); otherInfo.put(ATSConstants.COUNTERS, DAGUtils.convertCountersToJSON(event.getCounters())); + otherInfo.put(ATSConstants.LAST_DATA_EVENT_TIME, event.getLastDataEventTime()); + if (event.getLastDataEventSourceTA() != null) { + otherInfo.put(ATSConstants.LAST_DATA_EVENT_SOURCE_TA, event.getLastDataEventSourceTA().toString()); + } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); return jsonObject; http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/proto/HistoryEvents.proto ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto index 402349b..ffb382e 100644 --- a/tez-dag/src/main/proto/HistoryEvents.proto +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -175,6 +175,8 @@ message TaskAttemptFinishedProto { optional string diagnostics = 4; optional TezCountersProto counters = 5; optional string error_enum = 6; + optional int64 last_data_event_time = 7; + optional string last_data_event_source_t_a = 8; } message EventMetaDataProto { @@ -191,6 +193,7 @@ message TezDataMovementEventProto { optional CompositeEventProto composite_data_movement_event = 4; optional RootInputDataInformationEventProto root_input_data_information_event = 5; optional RootInputInitializerEventProto input_initializer_event = 6; + optional int64 event_time = 7; } message VertexDataMovementEventsGeneratedProto { http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 08f6ff6..8fa57d3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -114,7 +114,7 @@ public class MockDAGAppMaster extends DAGAppMaster { } public static interface EventsDelegate { - public void getEvents(TaskSpec taskSpec, List<TezEvent> events); + public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time); } // mock container launcher does not launch real tasks. @@ -408,7 +408,7 @@ public class MockDAGAppMaster extends DAGAppMaster { List<TezEvent> events = Lists.newArrayListWithCapacity( cData.taskSpec.getOutputs().size() + 1); if (cData.numUpdates == 0 && eventsDelegate != null) { - eventsDelegate.getEvents(cData.taskSpec, events); + eventsDelegate.getEvents(cData.taskSpec, events, getContext().getClock().getTime()); } TezCounters counters = null; if (countersDelegate != null) { @@ -422,7 +422,8 @@ public class MockDAGAppMaster extends DAGAppMaster { float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1; float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f; events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData( - EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId))); + EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId), + getContext().getClock().getTime())); TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events, cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000); doHeartbeat(request, cData); @@ -433,7 +434,8 @@ public class MockDAGAppMaster extends DAGAppMaster { cData.completed = true; List<TezEvent> events = Collections.singletonList(new TezEvent( new TaskAttemptCompletedEvent(), new EventMetaData( - EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId))); + EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId), + getContext().getClock().getTime())); TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events, cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000); doHeartbeat(request, cData); http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index 4137d42..42d4b0b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -126,17 +126,17 @@ public class TestMockDAGAppMaster { static class TestEventsDelegate implements EventsDelegate { @Override - public void getEvents(TaskSpec taskSpec, List<TezEvent> events) { + public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time) { for (OutputSpec output : taskSpec.getOutputs()) { if (output.getPhysicalEdgeCount() == 1) { events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, null), new EventMetaData( EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output - .getDestinationVertexName(), taskSpec.getTaskAttemptID()))); + .getDestinationVertexName(), taskSpec.getTaskAttemptID()), time)); } else { events.add(new TezEvent(CompositeDataMovementEvent.create(0, output.getPhysicalEdgeCount(), null), new EventMetaData( EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output - .getDestinationVertexName(), taskSpec.getTaskAttemptID()))); + .getDestinationVertexName(), taskSpec.getTaskAttemptID()), time)); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index 5c24ecc..d8a7388 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -99,12 +99,15 @@ public class TestTaskAttemptListenerImplTezDag { eventHandler = mock(EventHandler.class); + MockClock clock = new MockClock(); + appContext = mock(AppContext.class); doReturn(eventHandler).when(appContext).getEventHandler(); doReturn(dag).when(appContext).getCurrentDAG(); doReturn(appAcls).when(appContext).getApplicationACLs(); doReturn(amContainerMap).when(appContext).getAllContainers(); - + doReturn(clock).when(appContext).getClock(); + taskAttemptListener = new TaskAttemptListenerImplForTest(appContext, mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null); http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java index d632aa3..920109b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java @@ -169,9 +169,11 @@ public class TestTaskAttemptRecovery { errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR; } + long lastDataEventTime = 1024; + TezTaskAttemptID lastDataEventTA = mock(TezTaskAttemptID.class); TaskAttemptState recoveredState = ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - startTime, finishTime, state, errorEnum, diag, counters)); + startTime, finishTime, state, errorEnum, diag, counters, lastDataEventTime, lastDataEventTA)); assertEquals(startTime, ta.getLaunchTime()); assertEquals(finishTime, ta.getFinishTime()); assertEquals(counters, ta.reportedStatus.counters); @@ -180,6 +182,8 @@ public class TestTaskAttemptRecovery { assertEquals(1, ta.getDiagnostics().size()); assertEquals(diag, ta.getDiagnostics().get(0)); assertEquals(state, recoveredState); + assertEquals(lastDataEventTime, ta.lastDataEventTime); + assertEquals(lastDataEventTA, ta.lastDataEventSourceTA); if (state != TaskAttemptState.SUCCEEDED) { assertEquals(errorEnum, ta.getTerminationCause()); } else { @@ -304,7 +308,7 @@ public class TestTaskAttemptRecovery { TaskAttemptState recoveredState = ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, startTime, finishTime, TaskAttemptState.KILLED, - TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters())); + TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), 0, null)); assertEquals(TaskAttemptState.KILLED, recoveredState); } } http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java index feb290f..87e7498 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java @@ -286,7 +286,7 @@ public class TestTaskRecovery { restoreFromTaskStartEvent(); TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters())); + 0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), 0, null)); task.handle(new TaskEventRecoverTask(task.getTaskId())); // wait for the second task attempt is scheduled dispatcher.await(); @@ -307,7 +307,7 @@ public class TestTaskRecovery { restoreFromTaskStartEvent(); TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters())); + 0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), 0, null)); task.handle(new TaskEventRecoverTask(task.getTaskId())); // wait for the second task attempt is scheduled dispatcher.await(); @@ -329,7 +329,7 @@ public class TestTaskRecovery { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); try { task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters())); + 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), 0, null)); fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)"); } catch (TezUncheckedException e) { assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover")); @@ -372,7 +372,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters())); + "", new TezCounters(), 0, null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -405,7 +405,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.FAILED, null, - "", new TezCounters())); + "", new TezCounters(), 0, null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -438,7 +438,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.KILLED, null, - "", new TezCounters())); + "", new TezCounters(), 0, null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -473,7 +473,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters())); + "", new TezCounters(), 0, null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -516,7 +516,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters())); + "", new TezCounters(), 0, null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -528,7 +528,7 @@ public class TestTaskRecovery { recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.FAILED, null, - "", new TezCounters())); + "", new TezCounters(), 0, null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -563,7 +563,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters())); + "", new TezCounters(), 0, null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -575,7 +575,7 @@ public class TestTaskRecovery { recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.KILLED, null, - "", new TezCounters())); + "", new TezCounters(), 0, null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -614,7 +614,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters())); + "", new TezCounters(), 0, null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -654,7 +654,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters())); + "", new TezCounters(), 0, null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -735,7 +735,7 @@ public class TestTaskRecovery { recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.KILLED, null, - "", new TezCounters())); + "", new TezCounters(), 0, null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); @@ -776,7 +776,7 @@ public class TestTaskRecovery { task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.KILLED, null, "", null)); + 0, TaskAttemptState.KILLED, null, "", null, 0, null)); } assertEquals(maxFailedAttempts, task.getAttempts().size()); assertEquals(0, task.failedAttempts); @@ -806,7 +806,7 @@ public class TestTaskRecovery { task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.FAILED, null, "", null)); + 0, TaskAttemptState.FAILED, null, "", null, 0, null)); } assertEquals(maxFailedAttempts, task.getAttempts().size()); assertEquals(maxFailedAttempts, task.failedAttempts); @@ -836,7 +836,7 @@ public class TestTaskRecovery { task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.FAILED, null, "", null)); + 0, TaskAttemptState.FAILED, null, "", null, 0, null)); } assertEquals(maxFailedAttempts - 1, task.getAttempts().size()); assertEquals(maxFailedAttempts - 1, task.failedAttempts); http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 98ef973..c55ea23 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -110,6 +110,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; +import org.apache.tez.dag.app.MockClock; import org.apache.tez.dag.app.TaskAttemptEventInfo; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; @@ -2179,6 +2180,8 @@ public class TestVertexImpl { }}) .when(execService).submit((Callable<Void>) any()); + MockClock clock = new MockClock(); + doReturn(execService).when(appContext).getExecService(); doReturn(conf).when(appContext).getAMConf(); doReturn(new Credentials()).when(dag).getCredentials(); @@ -2189,7 +2192,8 @@ public class TestVertexImpl { doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources(); doReturn(historyEventHandler).when(appContext).getHistoryHandler(); doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler(); - + doReturn(clock).when(appContext).getClock(); + vertexGroups = Maps.newHashMap(); for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) { vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo)); http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 9be3531..a32cc27 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -505,7 +505,9 @@ public class TestHistoryEventsProtoConversion { TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, - null, null, null); + null, null, null, 1024, + TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 110), 1), 1)); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -518,6 +520,8 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getState()); Assert.assertEquals(event.getCounters(), deserializedEvent.getCounters()); + Assert.assertEquals(event.getLastDataEventTime(), deserializedEvent.getLastDataEventTime()); + Assert.assertEquals(event.getLastDataEventSourceTA(), deserializedEvent.getLastDataEventSourceTA()); logEvents(event, deserializedEvent); } { @@ -525,7 +529,7 @@ public class TestHistoryEventsProtoConversion { TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, - TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters()); + TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), 0, null); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -588,9 +592,10 @@ public class TestHistoryEventsProtoConversion { } catch (RuntimeException e) { // Expected } + long eventTime = 1024; List<TezEvent> events = Arrays.asList(new TezEvent(DataMovementEvent.create(1, null), - new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null))); + new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null), eventTime)); event = new VertexRecoverableEventsGeneratedEvent( TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events); @@ -601,6 +606,8 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getTezEvents().size()); Assert.assertEquals(event.getTezEvents().get(0).getEventType(), deserializedEvent.getTezEvents().get(0).getEventType()); + Assert.assertEquals(event.getTezEvents().get(0).getEventReceivedTime(), + deserializedEvent.getTezEvents().get(0).getEventReceivedTime()); logEvents(event, deserializedEvent); } http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index db871a2..ec1603e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -164,7 +164,7 @@ public class TestHistoryEventJsonConversion { break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null); + random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, 0, null); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java index cca984a..b412c46 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java @@ -20,6 +20,8 @@ package org.apache.tez.history.parser.datamodel; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; + +import org.apache.tez.common.ATSConstants; import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; @@ -46,6 +48,9 @@ public class TaskAttemptInfo extends BaseInfo { private final String status; private final String logUrl; private final String schedulingCausalTA; + private final long lastDataEventTime; + private final String lastDataEventSourceTA; + private TaskInfo taskInfo; private Container container; @@ -75,6 +80,8 @@ public class TaskAttemptInfo extends BaseInfo { status = otherInfoNode.optString(Constants.STATUS); container = new Container(containerId, nodeId); + lastDataEventTime = otherInfoNode.optLong(ATSConstants.LAST_DATA_EVENT_TIME); + lastDataEventSourceTA = otherInfoNode.optString(ATSConstants.LAST_DATA_EVENT_SOURCE_TA); } void setTaskInfo(TaskInfo taskInfo) { @@ -104,6 +111,14 @@ public class TaskAttemptInfo extends BaseInfo { public final long getAbsoluteScheduledTime() { return scheduledTime; } + + public final long getLastDataEventTime() { + return lastDataEventTime; + } + + public final String getLastDataEventSourceTA() { + return lastDataEventSourceTA; + } public final long getTimeTaken() { return getFinishTime() - getStartTime(); http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java index 1d59e98..d205056 100644 --- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java +++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java @@ -80,6 +80,7 @@ import java.io.BufferedWriter; import java.io.File; import java.io.IOException; import java.io.OutputStreamWriter; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertTrue; @@ -105,7 +106,6 @@ public class TestATSFileParser { "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tez"; private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download"; - private static String timelineAddress; private static TezClient tezClient; private static int dagNumber; @@ -228,8 +228,11 @@ public class TestATSFileParser { assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName())); assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName())); assertTrue(dagInfo.getVertices().size() == 2); + String lastSourceTA = null; + String lastDataEventSourceTA = null; for (VertexInfo vertexInfo : dagInfo.getVertices()) { assertTrue(vertexInfo.getKilledTasksCount() == 0); + long finishTime = 0; for (TaskInfo taskInfo : vertexInfo.getTasks()) { assertTrue(taskInfo.getNumberOfTaskAttempts() == 1); assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0); @@ -240,6 +243,24 @@ public class TestATSFileParser { assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0); assertTrue(taskInfo.getFailedTaskAttempts().size() == 0); assertTrue(taskInfo.getKilledTaskAttempts().size() == 0); + List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts(); + if (vertexInfo.getVertexName().equals(TOKENIZER)) { + // get the last task to finish and track its successful attempt + if (finishTime < taskInfo.getAbsFinishTime()) { + finishTime = taskInfo.getAbsFinishTime(); + lastSourceTA = taskInfo.getSuccessfulAttemptId(); + } + } else { + for (TaskAttemptInfo attempt : attempts) { + assertTrue(attempt.getLastDataEventTime() > 0); + if (lastDataEventSourceTA == null) { + lastDataEventSourceTA = attempt.getLastDataEventSourceTA(); + } else { + // all attempts should have the same last data event source TA + assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA())); + } + } + } for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { assertTrue(attemptInfo.getStartTime() > 0); assertTrue(attemptInfo.getScheduledTime() > 0); @@ -258,6 +279,7 @@ public class TestATSFileParser { assertTrue(vertexInfo.getInputVertices().size() == 1); } } + assertTrue(lastSourceTA.equals(lastDataEventSourceTA)); } /** http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 95f77e2..eaed115 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -432,7 +432,11 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); atsEntity.addOtherInfo(ATSConstants.COUNTERS, DAGUtils.convertCountersToATSMap(event.getCounters())); - + atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENT_TIME, event.getLastDataEventTime()); + if (event.getLastDataEventSourceTA() != null) { + atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENT_SOURCE_TA, + event.getLastDataEventSourceTA().toString()); + } return atsEntity; } http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index bf8d0ec..838d9d6 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -169,7 +169,7 @@ public class TestHistoryEventTimelineConversion { break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null); + random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, 0, null); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), @@ -450,9 +450,10 @@ public class TestHistoryEventTimelineConversion { .values()[random.nextInt(TaskAttemptTerminationCause.values().length)]; String diagnostics = "random diagnostics message"; TezCounters counters = new TezCounters(); + long lastDataEventTime = finishTime - 1; TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName, - startTime, finishTime, state, error, diagnostics, counters); + startTime, finishTime, state, error, diagnostics, counters, lastDataEventTime, tezTaskAttemptID); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); @@ -475,12 +476,14 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(finishTime, evt.getTimestamp()); final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); - Assert.assertEquals(6, otherInfo.size()); + Assert.assertEquals(8, otherInfo.size()); Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME)); Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN)); Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS)); Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM)); Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS)); + Assert.assertEquals(lastDataEventTime, otherInfo.get(ATSConstants.LAST_DATA_EVENT_TIME)); + Assert.assertEquals(tezTaskAttemptID.toString(), otherInfo.get(ATSConstants.LAST_DATA_EVENT_SOURCE_TA)); Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS)); } http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java index 974e190..b44b7d4 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java @@ -55,12 +55,19 @@ public class TezEvent implements Writable { private EventMetaData sourceInfo; private EventMetaData destinationInfo; + + private long eventReceivedTime; public TezEvent() { } public TezEvent(Event event, EventMetaData sourceInfo) { + this(event, sourceInfo, System.currentTimeMillis()); + } + + public TezEvent(Event event, EventMetaData sourceInfo, long time) { this.event = event; + this.eventReceivedTime = time; this.setSourceInfo(sourceInfo); if (event instanceof DataMovementEvent) { eventType = EventType.DATA_MOVEMENT_EVENT; @@ -91,6 +98,14 @@ public class TezEvent implements Writable { public Event getEvent() { return event; } + + public void setEventReceivedTime(long eventReceivedTime) { // TODO save + this.eventReceivedTime = eventReceivedTime; + } + + public long getEventReceivedTime() { + return eventReceivedTime; + } public EventMetaData getSourceInfo() { return sourceInfo; @@ -119,6 +134,7 @@ public class TezEvent implements Writable { } out.writeBoolean(true); out.writeInt(eventType.ordinal()); + out.writeLong(eventReceivedTime); if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) { // TODO NEWTEZ convert to PB TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event; @@ -188,6 +204,7 @@ public class TezEvent implements Writable { return; } eventType = EventType.values()[in.readInt()]; + eventReceivedTime = in.readLong(); if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) { // TODO NEWTEZ convert to PB event = new TaskStatusUpdateEvent();
