Repository: tez Updated Branches: refs/heads/master f37699f1a -> 4d381d778
TEZ-2646. Add scheduling casual dependency for attempts (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4d381d77 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4d381d77 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4d381d77 Branch: refs/heads/master Commit: 4d381d778f82d8d17ea9f718bbac3938ce4cc40d Parents: f37699f Author: Bikas Saha <[email protected]> Authored: Mon Aug 3 11:38:15 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Mon Aug 3 11:38:15 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/ATSConstants.java | 1 + .../dag/app/dag/event/TaskEventTAUpdate.java | 14 ++++- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 29 ++++++++-- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 61 +++++++++++++++----- .../history/events/TaskAttemptStartedEvent.java | 43 ++++++++++---- .../impl/HistoryEventJsonConversion.java | 4 ++ tez-dag/src/main/proto/HistoryEvents.proto | 2 + .../app/dag/impl/TestTaskAttemptRecovery.java | 2 +- .../tez/dag/app/dag/impl/TestTaskImpl.java | 41 +++++++++++-- .../tez/dag/app/dag/impl/TestTaskRecovery.java | 12 ++-- .../TestHistoryEventsProtoConversion.java | 9 ++- .../impl/TestHistoryEventJsonConversion.java | 2 +- .../parser/datamodel/TaskAttemptInfo.java | 7 ++- .../apache/tez/history/TestATSFileParser.java | 15 +++++ .../ats/HistoryEventTimelineConversion.java | 5 ++ .../ats/TestHistoryEventTimelineConversion.java | 9 ++- 17 files changed, 209 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fdd3ba9..303171a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES TEZ-2048. Remove VertexManagerPluginContext.getTaskContainer() TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce merge overhead. TEZ-2468. Change the minimum Java version to Java 7. + TEZ-2646. Add scheduling casual dependency for attempts ALL CHANGES: TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation. http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index fd82e20..4bf9f6d 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -82,6 +82,7 @@ public class ATSConstants { public static final String COMPLETED_LOGS_URL = "completedLogsURL"; public static final String EXIT_STATUS = "exitStatus"; public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers"; + public static final String SCHEDULING_CAUSAL_ATTEMPT = "schedulingCausalAttempt"; /* Counters-related keys */ public static final String COUNTER_GROUPS = "counterGroups"; http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java index 59c7363..01eaf5b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java @@ -18,19 +18,31 @@ package org.apache.tez.dag.app.dag.event; +import org.apache.tez.common.TezAbstractEvent; import org.apache.tez.dag.records.TezTaskAttemptID; +@SuppressWarnings("rawtypes") public class TaskEventTAUpdate extends TaskEvent { private TezTaskAttemptID attemptID; + private TezAbstractEvent causalEvent; public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type) { + this(id, type, null); + } + + public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type, TezAbstractEvent causalEvent) { super(id.getTaskID(), type); this.attemptID = id; + this.causalEvent = causalEvent; } - + public TezTaskAttemptID getTaskAttemptID() { return attemptID; } + + public TezAbstractEvent getCausalEvent() { + return causalEvent; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index f015155..40636dd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -165,6 +165,9 @@ public class TaskAttemptImpl implements TaskAttempt, private final Resource taskResource; private final ContainerContext containerContext; private final boolean leafVertex; + + private TezTaskAttemptID schedulingCausalTA; + private long scheduledTime; protected static final FailedTransitionHelper FAILED_HELPER = new FailedTransitionHelper(); @@ -374,12 +377,22 @@ public class TaskAttemptImpl implements TaskAttempt, boolean isRescheduled, Resource resource, ContainerContext containerContext, boolean leafVertex, Task task) { + this(taskId, attemptNumber, eventHandler, taskAttemptListener, conf, clock, + taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex, + task, null); + } + public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, + TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock, + TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, + boolean isRescheduled, + Resource resource, ContainerContext containerContext, boolean leafVertex, + Task task, TezTaskAttemptID schedulingCausalTA) { - this.MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration + MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT); - this.MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration + MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT); ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -394,6 +407,8 @@ public class TaskAttemptImpl implements TaskAttempt, this.appContext = appContext; this.task = task; this.vertex = this.task.getVertex(); + this.schedulingCausalTA = schedulingCausalTA; + this.scheduledTime = clock.getTime(); this.reportedStatus = new TaskAttemptStatus(this.attemptId); initTaskAttemptStatus(reportedStatus); @@ -425,6 +440,10 @@ public class TaskAttemptImpl implements TaskAttempt, public TezDAGID getDAGID() { return getVertexID().getDAGId(); } + + public TezTaskAttemptID getSchedulingCausalTA() { + return schedulingCausalTA; + } TaskSpec createRemoteTaskSpec() throws AMUserCodeException { TaskSpec baseTaskSpec = task.getBaseTaskSpec(); @@ -716,6 +735,8 @@ public class TaskAttemptImpl implements TaskAttempt, { TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent; this.launchTime = tEvent.getStartTime(); + this.scheduledTime = tEvent.getScheduledTime(); + this.schedulingCausalTA = tEvent.getSchedulingCausalTA(); recoveryStartEventSeen = true; recoveredState = TaskAttemptState.RUNNING; this.containerId = tEvent.getContainerId(); @@ -936,7 +957,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent( attemptId, getVertex().getName(), launchTime, containerId, containerNodeId, - inProgressLogsUrl, completedLogsUrl, nodeHttpAddress); + inProgressLogsUrl, completedLogsUrl, nodeHttpAddress, scheduledTime, schedulingCausalTA); this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), startEvt)); } @@ -1076,7 +1097,7 @@ public class TaskAttemptImpl implements TaskAttempt, .getTaskAttemptState()); // Send out events to the Task - indicating TaskAttemptTermination(F/K) ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper - .getTaskEventType())); + .getTaskEventType(), event)); } } http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index ef8e33a..e6027f5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -70,6 +70,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; @@ -549,7 +550,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } private TaskAttempt createRecoveredTaskAttempt(TezTaskAttemptID tezTaskAttemptID) { - TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId()); + TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId(), null); return taskAttempt; } @@ -814,10 +815,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } } - TaskAttemptImpl createAttempt(int attemptNumber) { + TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) { return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, - (failedAttempts > 0), taskResource, containerContext, leafVertex, this); + (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA); } @Override @@ -834,8 +835,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } // This is always called in the Write Lock - private void addAndScheduleAttempt() { - TaskAttempt attempt = createAttempt(attempts.size()); + private void addAndScheduleAttempt(TezTaskAttemptID schedulingCausalTA) { + TaskAttempt attempt = createAttempt(attempts.size(), schedulingCausalTA); if (LOG.isDebugEnabled()) { LOG.debug("Created attempt " + attempt.getID()); } @@ -1048,7 +1049,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { TaskEventScheduleTask scheduleEvent = (TaskEventScheduleTask) event; task.locationHint = scheduleEvent.getTaskLocationHint(); task.baseTaskSpec = scheduleEvent.getBaseTaskSpec(); - task.addAndScheduleAttempt(); + // For now, initial scheduling dependency is due to vertex manager scheduling + task.addAndScheduleAttempt(null); task.scheduledTime = task.clock.getTime(); task.logJobHistoryTaskStartedEvent(); } @@ -1066,7 +1068,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { public void transition(TaskImpl task, TaskEvent event) { LOG.info("Scheduling a redundant attempt for task " + task.taskId); task.counters.findCounter(TaskCounter.NUM_SPECULATIONS).increment(1); - task.addAndScheduleAttempt(); + TezTaskAttemptID earliestUnfinishedAttempt = null; + for (TaskAttempt ta : task.attempts.values()) { + // find the oldest running attempt + if (!ta.isFinished()) { + earliestUnfinishedAttempt = ta.getID(); + } + } + task.addAndScheduleAttempt(earliestUnfinishedAttempt); } } @@ -1143,9 +1152,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // we KillWaitAttemptCompletedTransitionready have a spare task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true); task.getVertex().incrementKilledTaskAttemptCount(); - if (task.getUncompletedAttemptsCount() == 0 - && task.successfulAttempt == null) { - task.addAndScheduleAttempt(); + if (task.shouldScheduleNewAttempt()) { + task.addAndScheduleAttempt(castEvent.getTaskAttemptID()); } } } @@ -1255,7 +1263,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // If any incomplete, the running attempt will moved to failed and its // update will trigger a new attempt if possible if (task.attempts.size() == task.getFinishedAttemptsCount()) { - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(null); } endState = TaskStateInternal.RUNNING; break; @@ -1304,15 +1312,23 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { return task.getInternalState(); } } + + private boolean shouldScheduleNewAttempt() { + return (getUncompletedAttemptsCount() == 0 + && successfulAttempt == null); + } private static class AttemptFailedTransition implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> { + private TezTaskAttemptID schedulingCausalTA; + @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { task.failedAttempts++; task.getVertex().incrementFailedTaskAttemptCount(); TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event; + schedulingCausalTA = castEvent.getTaskAttemptID(); task.addDiagnosticInfo("TaskAttempt " + castEvent.getTaskAttemptID().getId() + " failed," + " info=" + task.getAttempt(castEvent.getTaskAttemptID()).getDiagnostics()); if (task.commitAttempt != null && @@ -1327,12 +1343,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { ((TaskEventTAUpdate) event).getTaskAttemptID(), TaskAttemptStateInternal.FAILED); // we don't need a new event if we already have a spare - if (task.getUncompletedAttemptsCount() == 0 - && task.successfulAttempt == null) { + if (task.shouldScheduleNewAttempt()) { LOG.info("Scheduling new attempt for task: " + task.getTaskId() + ", currentFailedAttempts: " + task.failedAttempts + ", maxFailedAttempts: " + task.maxFailedAttempts); - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(getSchedulingCausalTA()); } } else { LOG.info("Failing task: " + task.getTaskId() @@ -1352,11 +1367,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { protected TaskStateInternal getDefaultState(TaskImpl task) { return task.getInternalState(); } + + protected TezTaskAttemptID getSchedulingCausalTA() { + return schedulingCausalTA; + } } private static class TaskRetroactiveFailureTransition extends AttemptFailedTransition { + private TezTaskAttemptID schedulingCausalTA; + @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { if (task.leafVertex) { @@ -1386,6 +1407,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // succeeded state return TaskStateInternal.SUCCEEDED; } + + Preconditions.checkState(castEvent.getCausalEvent() != null); + TaskAttemptEventOutputFailed destinationEvent = + (TaskAttemptEventOutputFailed) castEvent.getCausalEvent(); + schedulingCausalTA = destinationEvent.getInputFailedEvent().getSourceInfo().getTaskAttemptID(); // super.transition is mostly coded for the case where an // UNcompleted task failed. When a COMPLETED task retroactively @@ -1402,6 +1428,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { return returnState; } + + @Override + protected TezTaskAttemptID getSchedulingCausalTA() { + return schedulingCausalTA; + } @Override protected TaskStateInternal getDefaultState(TaskImpl task) { @@ -1433,7 +1464,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // from the map splitInfo. So the bad node might be sent as a location // to the RM. But the RM would ignore that just like it would ignore // currently pending container requests affinitized to bad nodes. - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(attemptId); return TaskStateInternal.SCHEDULED; } else { // nothing to do http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java index 36add86..8eb074d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java @@ -36,24 +36,28 @@ public class TaskAttemptStartedEvent implements HistoryEvent { private String inProgressLogsUrl; private String completedLogsUrl; private String vertexName; - private long startTime; + private long launchTime; private ContainerId containerId; private NodeId nodeId; private String nodeHttpAddress; + private TezTaskAttemptID schedulingCausalTA; + private long scheduledTime; public TaskAttemptStartedEvent(TezTaskAttemptID taId, - String vertexName, long startTime, + String vertexName, long launchTime, ContainerId containerId, NodeId nodeId, String inProgressLogsUrl, String completedLogsUrl, - String nodeHttpAddress) { + String nodeHttpAddress, long scheduledTime, TezTaskAttemptID schedulingCausalTA) { this.taskAttemptId = taId; this.vertexName = vertexName; - this.startTime = startTime; + this.launchTime = launchTime; this.containerId = containerId; this.nodeId = nodeId; this.inProgressLogsUrl = inProgressLogsUrl; this.completedLogsUrl = completedLogsUrl; this.nodeHttpAddress = nodeHttpAddress; + this.scheduledTime = scheduledTime; + this.schedulingCausalTA = schedulingCausalTA; } public TaskAttemptStartedEvent() { @@ -75,19 +79,27 @@ public class TaskAttemptStartedEvent implements HistoryEvent { } public TaskAttemptStartedProto toProto() { - return TaskAttemptStartedProto.newBuilder() - .setTaskAttemptId(taskAttemptId.toString()) - .setStartTime(startTime) + TaskAttemptStartedProto.Builder builder = TaskAttemptStartedProto.newBuilder(); + builder.setTaskAttemptId(taskAttemptId.toString()) + .setStartTime(launchTime) .setContainerId(containerId.toString()) .setNodeId(nodeId.toString()) - .build(); + .setScheduledTime(scheduledTime); + if (schedulingCausalTA != null) { + builder.setSchedulingCausalTA(schedulingCausalTA.toString()); + } + return builder.build(); } public void fromProto(TaskAttemptStartedProto proto) { this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId()); - this.startTime = proto.getStartTime(); + this.launchTime = proto.getStartTime(); this.containerId = ConverterUtils.toContainerId(proto.getContainerId()); this.nodeId = ConverterUtils.toNodeId(proto.getNodeId()); + this.scheduledTime = proto.getScheduledTime(); + if (proto.hasSchedulingCausalTA()) { + this.schedulingCausalTA = TezTaskAttemptID.fromString(proto.getSchedulingCausalTA()); + } } @Override @@ -108,7 +120,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent { public String toString() { return "vertexName=" + vertexName + ", taskAttemptId=" + taskAttemptId - + ", startTime=" + startTime + + ", scheduledTime=" + scheduledTime + + ", startTime=" + launchTime + ", containerId=" + containerId + ", nodeId=" + nodeId + ", inProgressLogs=" + inProgressLogsUrl @@ -120,7 +133,15 @@ public class TaskAttemptStartedEvent implements HistoryEvent { } public long getStartTime() { - return startTime; + return launchTime; + } + + public long getScheduledTime() { + return scheduledTime; + } + + public TezTaskAttemptID getSchedulingCausalTA() { + return schedulingCausalTA; } public ContainerId getContainerId() { http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 07ce2f3..3fdfe0a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -573,6 +573,10 @@ public class HistoryEventJsonConversion { JSONObject otherInfo = new JSONObject(); otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime()); + if (event.getSchedulingCausalTA() != null) { + otherInfo.put(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT, event.getSchedulingCausalTA().toString()); + } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); return jsonObject; http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/main/proto/HistoryEvents.proto ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto index 8af48b6..402349b 100644 --- a/tez-dag/src/main/proto/HistoryEvents.proto +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -164,6 +164,8 @@ message TaskAttemptStartedProto { optional int64 start_time = 2; optional string container_id = 3; optional string node_id = 4; + optional int64 scheduled_time = 5; + optional string scheduling_causal_t_a = 6; } message TaskAttemptFinishedProto { http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java index 0665b1e..d632aa3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java @@ -155,7 +155,7 @@ public class TestTaskAttemptRecovery { private void restoreFromTAStartEvent() { TaskAttemptState recoveredState = ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); + startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); assertEquals(startTime, ta.getLaunchTime()); assertEquals(TaskAttemptState.RUNNING, recoveredState); } http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 9e5d395..807f277 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -55,6 +55,7 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.TaskStateInternal; import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; import org.apache.tez.dag.app.dag.event.TaskEventTermination; @@ -71,6 +72,7 @@ import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -358,7 +360,10 @@ public class TestTaskImpl { LOG.info("--- START: testKillScheduledTaskAttempt ---"); TezTaskID taskId = getNewTaskID(); scheduleTaskAttempt(taskId); + TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID(); killScheduledTaskAttempt(mockTask.getLastAttempt().getID()); + // last killed attempt should be causal TA of next attempt + Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA()); } @Test(timeout = 5000) @@ -382,8 +387,11 @@ public class TestTaskImpl { LOG.info("--- START: testKillRunningTaskAttempt ---"); TezTaskID taskId = getNewTaskID(); scheduleTaskAttempt(taskId); + TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID(); launchTaskAttempt(mockTask.getLastAttempt().getID()); killRunningTaskAttempt(mockTask.getLastAttempt().getID()); + // last killed attempt should be causal TA of next attempt + Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA()); } /** @@ -504,11 +512,15 @@ public class TestTaskImpl { // During the task attempt commit there is an exception which causes // the attempt to fail + TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID(); updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED); + assertEquals(1, mockTask.getAttemptList().size()); failRunningTaskAttempt(mockTask.getLastAttempt().getID()); assertEquals(2, mockTask.getAttemptList().size()); assertEquals(1, mockTask.failedAttempts); + // last failed attempt should be the causal TA + Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA()); assertFalse("First attempt should not commit", mockTask.canCommit(mockTask.getAttemptList().get(0).getID())); @@ -552,6 +564,7 @@ public class TestTaskImpl { scheduleTaskAttempt(taskId); launchTaskAttempt(mockTask.getLastAttempt().getID()); updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); + TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID(); // Add a speculative task attempt that succeeds mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), @@ -559,6 +572,11 @@ public class TestTaskImpl { launchTaskAttempt(mockTask.getLastAttempt().getID()); updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); + assertEquals(2, mockTask.getAttemptList().size()); + + // previous running attempt should be the casual TA of this speculative attempt + Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA()); + assertTrue("Second attempt should commit", mockTask.canCommit(mockTask.getAttemptList().get(1).getID())); assertFalse("First attempt should not commit", @@ -601,8 +619,14 @@ public class TestTaskImpl { eventHandler.events.clear(); // Now fail the attempt after it has succeeded + TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class); + TezEvent mockTezEvent = mock(TezEvent.class); + EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId); + when(mockTezEvent.getSourceInfo()).thenReturn(meta); + TaskAttemptEventOutputFailed outputFailedEvent = + new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1); mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt() - .getID(), TaskEventType.T_ATTEMPT_FAILED)); + .getID(), TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent)); // The task should still be in the scheduled state assertTaskScheduledState(); @@ -610,6 +634,12 @@ public class TestTaskImpl { Assert.assertEquals(AMNodeEventType.N_TA_ENDED, event.getType()); event = eventHandler.events.get(eventHandler.events.size()-1); Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType()); + + // report of output read error should be the causal TA + List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList(); + Assert.assertEquals(2, attempts.size()); + MockTaskAttemptImpl newAttempt = attempts.get(1); + Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA()); } @Test(timeout = 5000) @@ -679,11 +709,11 @@ public class TestTaskImpl { } @Override - protected TaskAttemptImpl createAttempt(int attemptNumber) { + protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) { MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, - true, taskResource, containerContext); + true, taskResource, containerContext, schedCausalTA); taskAttempts.add(attempt); return attempt; } @@ -730,9 +760,10 @@ public class TestTaskImpl { EventHandler eventHandler, TaskAttemptListener tal, Configuration conf, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, boolean isRescheduled, - Resource resource, ContainerContext containerContext) { + Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) { super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh, - appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class)); + appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class), + schedCausalTA); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java index f43f52c..feb290f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java @@ -215,7 +215,7 @@ public class TestTaskRecovery { long taStartTime = taskStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); + taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(0, task.getFinishedAttemptsCount()); assertEquals(taskScheduledTime, task.scheduledTime); @@ -721,7 +721,7 @@ public class TestTaskRecovery { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); TaskState recoveredState = task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); + taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); @@ -774,7 +774,7 @@ public class TestTaskRecovery { for (int i = 0; i < maxFailedAttempts; ++i) { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "", "")); + mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, 0, TaskAttemptState.KILLED, null, "", null)); } @@ -804,7 +804,7 @@ public class TestTaskRecovery { for (int i = 0; i < maxFailedAttempts; ++i) { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "", "")); + mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, 0, TaskAttemptState.FAILED, null, "", null)); } @@ -834,7 +834,7 @@ public class TestTaskRecovery { for (int i = 0; i < maxFailedAttempts - 1; ++i) { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "", "")); + mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, 0, TaskAttemptState.FAILED, null, "", null)); } @@ -844,7 +844,7 @@ public class TestTaskRecovery { TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId()); TaskState recoveredState = task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId, - vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "")); + vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index b52a4f9..9be3531 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -478,7 +478,10 @@ public class TestHistoryEventsProtoConversion { "vertex1", 10009l, ContainerId.newInstance( ApplicationAttemptId.newInstance( ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance( - "host1", 19999), "inProgress", "Completed", "nodeHttpAddress"); + "host1", 19999), "inProgress", "Completed", "nodeHttpAddress", 1024, + TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0) + ); TaskAttemptStartedEvent deserializedEvent = (TaskAttemptStartedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -489,6 +492,10 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getNodeId()); Assert.assertEquals(event.getStartTime(), deserializedEvent.getStartTime()); + Assert.assertEquals(event.getScheduledTime(), + deserializedEvent.getScheduledTime()); + Assert.assertEquals(event.getSchedulingCausalTA(), + deserializedEvent.getSchedulingCausalTA()); logEvents(event, deserializedEvent); } http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index 3ab204a..db871a2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -160,7 +160,7 @@ public class TestHistoryEventJsonConversion { break; case TASK_ATTEMPT_STARTED: event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId, - nodeId, null, null, "nodeHttpAddress"); + nodeId, null, null, "nodeHttpAddress", 0, null); break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java index 4c3fa97..cca984a 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java @@ -45,7 +45,7 @@ public class TaskAttemptInfo extends BaseInfo { private final String nodeId; private final String status; private final String logUrl; - + private final String schedulingCausalTA; private TaskInfo taskInfo; private Container container; @@ -66,6 +66,7 @@ public class TaskAttemptInfo extends BaseInfo { diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS); successfulAttemptId = otherInfoNode.optString(Constants.SUCCESSFUL_ATTEMPT_ID); scheduledTime = otherInfoNode.optLong(Constants.SCHEDULED_TIME); + schedulingCausalTA = otherInfoNode.optString(Constants.SCHEDULING_CAUSAL_ATTEMPT); containerId = otherInfoNode.optString(Constants.CONTAINER_ID); String id = otherInfoNode.optString(Constants.NODE_ID); @@ -111,6 +112,10 @@ public class TaskAttemptInfo extends BaseInfo { public final long getScheduledTime() { return scheduledTime - (getTaskInfo().getVertexInfo().getDagInfo().getAbsStartTime()); } + + public final String getSchedulingCausalTA() { + return schedulingCausalTA; + } @Override public final String getDiagnostics() { http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java index faff182..1d59e98 100644 --- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java +++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java @@ -240,6 +240,10 @@ public class TestATSFileParser { assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0); assertTrue(taskInfo.getFailedTaskAttempts().size() == 0); assertTrue(taskInfo.getKilledTaskAttempts().size() == 0); + for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { + assertTrue(attemptInfo.getStartTime() > 0); + assertTrue(attemptInfo.getScheduledTime() > 0); + } } assertTrue(vertexInfo.getLastTaskToFinish() != null); if (vertexInfo.getVertexName().equals(TOKENIZER)) { @@ -326,6 +330,17 @@ public class TestATSFileParser { 20); //Every line has 2 words. 10 lines x 2 words = 20 verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()), "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above + + for (TaskInfo taskInfo : summationVertex.getTasks()) { + String lastAttemptId = null; + for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { + if (lastAttemptId != null) { + // failed attempt should be causal TA of next attempt + assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA())); + } + lastAttemptId = attemptInfo.getTaskAttemptId(); + } + } //TODO: Need to check for SUMMATION vertex counters. Since all attempts are failed, counters are not getting populated. //TaskCounter.REDUCE_INPUT_RECORDS http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 77f4dd1..95f77e2 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -467,6 +467,11 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name()); + atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime()); + if (event.getSchedulingCausalTA() != null) { + atsEntity.addOtherInfo(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT, + event.getSchedulingCausalTA().toString()); + } return atsEntity; } http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index e324d1b..bf8d0ec 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -165,7 +165,7 @@ public class TestHistoryEventTimelineConversion { break; case TASK_ATTEMPT_STARTED: event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId, - nodeId, null, null, "nodeHttpAddress"); + nodeId, null, null, "nodeHttpAddress", 0, null); break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), @@ -724,8 +724,10 @@ public class TestHistoryEventTimelineConversion { @Test(timeout = 5000) public void testConvertTaskAttemptStartedEvent() { long startTime = random.nextLong(); + long scheduleTime = 1024; TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", - startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress"); + startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress", + scheduleTime, tezTaskAttemptID); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); @@ -770,6 +772,9 @@ public class TestHistoryEventTimelineConversion { timelineEntity.getOtherInfo().get(ATSConstants.NODE_HTTP_ADDRESS)); Assert.assertTrue(TaskAttemptState.RUNNING.name() .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS))); + Assert.assertEquals(tezTaskAttemptID.toString(), + timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT)); + Assert.assertEquals(scheduleTime, timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)); } @Test(timeout = 5000)
