Repository: tez Updated Branches: refs/heads/master b8e8bcbd0 -> 6b67b0bc1
TEZ-2701. Add time at which container was allocated to attempt (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6b67b0bc Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6b67b0bc Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6b67b0bc Branch: refs/heads/master Commit: 6b67b0bc1eb010f6dc8af2936ae738909e1244ff Parents: b8e8bcb Author: Bikas Saha <[email protected]> Authored: Thu Aug 13 17:53:52 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Thu Aug 13 17:53:52 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 5 +- .../org/apache/tez/common/ATSConstants.java | 4 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 52 ++++++++++++++++---- .../tez/dag/app/rm/container/AMContainer.java | 1 + .../dag/app/rm/container/AMContainerImpl.java | 12 +++++ .../history/events/TaskAttemptStartedEvent.java | 41 +++++++++------ .../impl/HistoryEventJsonConversion.java | 7 +-- tez-dag/src/main/proto/HistoryEvents.proto | 5 +- .../app/dag/impl/TestTaskAttemptRecovery.java | 12 +++-- .../tez/dag/app/dag/impl/TestTaskRecovery.java | 12 ++--- .../dag/app/rm/container/TestAMContainer.java | 5 +- .../TestHistoryEventsProtoConversion.java | 12 +++-- .../impl/TestHistoryEventJsonConversion.java | 2 +- .../parser/datamodel/TaskAttemptInfo.java | 37 +++++++++----- .../apache/tez/history/TestHistoryParser.java | 17 ++++--- .../ats/HistoryEventTimelineConversion.java | 9 ++-- .../ats/TestHistoryEventTimelineConversion.java | 12 +++-- 17 files changed, 167 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 432c82b..bbe9321 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,14 +8,15 @@ INCOMPATIBLE CHANGES TEZ-2048. Remove VertexManagerPluginContext.getTaskContainer() TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce merge overhead. TEZ-2468. Change the minimum Java version to Java 7. + +ALL CHANGES: TEZ-2646. Add scheduling casual dependency for attempts TEZ-2647. Add input causality dependency for attempts TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts instead of tasks TEZ-2650. Timing details on Vertex state changes TEZ-2699. Internalize strings in ATF parser - -ALL CHANGES: + TEZ-2701. Add time at which container was allocated to attempt TEZ-2683. TestHttpConnection::testAsyncHttpConnectionInterrupt fails in certain environments. TEZ-2692. bugfixes & enhancements related to job parser and analyzer. TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM. http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index 1568b96..4566a91 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -57,6 +57,8 @@ public class ATSConstants { public static final String VERTEX_NAME = "vertexName"; public static final String VERTEX_NAME_ID_MAPPING = "vertexNameIdMapping"; public static final String SCHEDULED_TIME = "scheduledTime"; + public static final String CREATION_TIME = "creationTime"; + public static final String ALLOCATION_TIME = "allocationTime"; public static final String INIT_REQUESTED_TIME = "initRequestedTime"; public static final String INIT_TIME = "initTime"; public static final String START_REQUESTED_TIME = "startRequestedTime"; @@ -84,7 +86,7 @@ public class ATSConstants { public static final String LAST_DATA_EVENT_TIME = "lastDataEventTime"; public static final String LAST_DATA_EVENT_SOURCE_TA = "lastDataEventSourceTA"; public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers"; - public static final String SCHEDULING_CAUSAL_ATTEMPT = "schedulingCausalAttempt"; + public static final String CREATION_CAUSAL_ATTEMPT = "creationCausalAttempt"; /* Counters-related keys */ public static final String COUNTER_GROUPS = "counterGroups"; http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index ebf7c58..e5a6f84 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -86,6 +86,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate; import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded; import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest; +import org.apache.tez.dag.app.rm.container.AMContainer; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; @@ -138,6 +139,7 @@ public class TaskAttemptImpl implements TaskAttempt, // TODO Can these be replaced by the container object TEZ-1037 private Container container; + private long allocationTime; private ContainerId containerId; private NodeId containerNodeId; private String nodeHttpAddress; @@ -170,8 +172,8 @@ public class TaskAttemptImpl implements TaskAttempt, private final ContainerContext containerContext; private final boolean leafVertex; - private TezTaskAttemptID schedulingCausalTA; - private long scheduledTime; + private TezTaskAttemptID creationCausalTA; + private long creationTime; protected static final FailedTransitionHelper FAILED_HELPER = new FailedTransitionHelper(); @@ -411,8 +413,8 @@ public class TaskAttemptImpl implements TaskAttempt, this.appContext = appContext; this.task = task; this.vertex = this.task.getVertex(); - this.schedulingCausalTA = schedulingCausalTA; - this.scheduledTime = clock.getTime(); + this.creationCausalTA = schedulingCausalTA; + this.creationTime = clock.getTime(); this.reportedStatus = new TaskAttemptStatus(this.attemptId); initTaskAttemptStatus(reportedStatus); @@ -446,7 +448,7 @@ public class TaskAttemptImpl implements TaskAttempt, } public TezTaskAttemptID getSchedulingCausalTA() { - return schedulingCausalTA; + return creationCausalTA; } TaskSpec createRemoteTaskSpec() throws AMUserCodeException { @@ -646,6 +648,33 @@ public class TaskAttemptImpl implements TaskAttempt, } } + public long getCreationTime() { + readLock.lock(); + try { + return creationTime; + } finally { + readLock.unlock(); + } + } + + public TezTaskAttemptID getCreationCausalAttempt() { + readLock.lock(); + try { + return creationCausalTA; + } finally { + readLock.unlock(); + } + } + + public long getAllocationTime() { + readLock.lock(); + try { + return allocationTime; + } finally { + readLock.unlock(); + } + } + @Override public long getFinishTime() { readLock.lock(); @@ -739,8 +768,9 @@ public class TaskAttemptImpl implements TaskAttempt, { TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent; this.launchTime = tEvent.getStartTime(); - this.scheduledTime = tEvent.getScheduledTime(); - this.schedulingCausalTA = tEvent.getSchedulingCausalTA(); + this.creationTime = tEvent.getCreationTime(); + this.allocationTime = tEvent.getAllocationTime(); + this.creationCausalTA = tEvent.getCreationCausalTA(); recoveryStartEventSeen = true; recoveredState = TaskAttemptState.RUNNING; this.containerId = tEvent.getContainerId(); @@ -963,7 +993,8 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent( attemptId, getVertex().getName(), launchTime, containerId, containerNodeId, - inProgressLogsUrl, completedLogsUrl, nodeHttpAddress, scheduledTime, schedulingCausalTA); + inProgressLogsUrl, completedLogsUrl, nodeHttpAddress, creationTime, creationCausalTA, + allocationTime); this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), startEvt)); } @@ -1114,9 +1145,10 @@ public class TaskAttemptImpl implements TaskAttempt, public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) { TaskAttemptEventStartedRemotely event = (TaskAttemptEventStartedRemotely) origEvent; - Container container = ta.appContext.getAllContainers() - .get(event.getContainerId()).getContainer(); + AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId()); + Container container = amContainer.getContainer(); + ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); ta.container = container; ta.containerId = event.getContainerId(); ta.containerNodeId = container.getNodeId(); http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java index a6b403d..7d6da8a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java @@ -32,5 +32,6 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{ public Container getContainer(); public List<TezTaskAttemptID> getAllTaskAttempts(); public TezTaskAttemptID getCurrentTaskAttempt(); + public long getCurrentTaskAttemptAllocationTime(); } http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 330f2b7..9b90752 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -95,6 +95,7 @@ public class AMContainerImpl implements AMContainer { private boolean nodeFailed = false; private TezTaskAttemptID currentAttempt; + private long currentAttemptAllocationTime; private List<TezTaskAttemptID> failedAssignments; private boolean inError = false; @@ -362,6 +363,16 @@ public class AMContainerImpl implements AMContainer { } } + @Override + public long getCurrentTaskAttemptAllocationTime() { + readLock.lock(); + try { + return this.currentAttemptAllocationTime; + } finally { + readLock.unlock(); + } + } + public boolean isInErrorState() { return inError; } @@ -532,6 +543,7 @@ public class AMContainerImpl implements AMContainer { // Register the additional resources back for this container. container.containerLocalResources.putAll(container.additionalLocalResources); container.currentAttempt = event.getTaskAttemptId(); + container.currentAttemptAllocationTime = container.appContext.getClock().getTime(); if (LOG.isDebugEnabled()) { LOG.debug("AssignTA: attempt: " + event.getRemoteTaskSpec()); LOG.debug("AdditionalLocalResources: " + container.additionalLocalResources); http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java index 8eb074d..4d15fb9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java @@ -40,14 +40,15 @@ public class TaskAttemptStartedEvent implements HistoryEvent { private ContainerId containerId; private NodeId nodeId; private String nodeHttpAddress; - private TezTaskAttemptID schedulingCausalTA; - private long scheduledTime; + private TezTaskAttemptID creationCausalTA; + private long creationTime; + private long allocationTime; public TaskAttemptStartedEvent(TezTaskAttemptID taId, String vertexName, long launchTime, ContainerId containerId, NodeId nodeId, String inProgressLogsUrl, String completedLogsUrl, - String nodeHttpAddress, long scheduledTime, TezTaskAttemptID schedulingCausalTA) { + String nodeHttpAddress, long creationTime, TezTaskAttemptID creationCausalTA, long allocationTime) { this.taskAttemptId = taId; this.vertexName = vertexName; this.launchTime = launchTime; @@ -56,8 +57,9 @@ public class TaskAttemptStartedEvent implements HistoryEvent { this.inProgressLogsUrl = inProgressLogsUrl; this.completedLogsUrl = completedLogsUrl; this.nodeHttpAddress = nodeHttpAddress; - this.scheduledTime = scheduledTime; - this.schedulingCausalTA = schedulingCausalTA; + this.creationTime = creationTime; + this.creationCausalTA = creationCausalTA; + this.allocationTime = allocationTime; } public TaskAttemptStartedEvent() { @@ -84,9 +86,10 @@ public class TaskAttemptStartedEvent implements HistoryEvent { .setStartTime(launchTime) .setContainerId(containerId.toString()) .setNodeId(nodeId.toString()) - .setScheduledTime(scheduledTime); - if (schedulingCausalTA != null) { - builder.setSchedulingCausalTA(schedulingCausalTA.toString()); + .setCreationTime(creationTime) + .setAllocationTime(allocationTime); + if (creationCausalTA != null) { + builder.setCreationCausalTA(creationCausalTA.toString()); } return builder.build(); } @@ -96,9 +99,10 @@ public class TaskAttemptStartedEvent implements HistoryEvent { this.launchTime = proto.getStartTime(); this.containerId = ConverterUtils.toContainerId(proto.getContainerId()); this.nodeId = ConverterUtils.toNodeId(proto.getNodeId()); - this.scheduledTime = proto.getScheduledTime(); - if (proto.hasSchedulingCausalTA()) { - this.schedulingCausalTA = TezTaskAttemptID.fromString(proto.getSchedulingCausalTA()); + this.creationTime = proto.getCreationTime(); + this.allocationTime = proto.getAllocationTime(); + if (proto.hasCreationCausalTA()) { + this.creationCausalTA = TezTaskAttemptID.fromString(proto.getCreationCausalTA()); } } @@ -120,7 +124,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent { public String toString() { return "vertexName=" + vertexName + ", taskAttemptId=" + taskAttemptId - + ", scheduledTime=" + scheduledTime + + ", creationTime=" + creationTime + + ", allocationTime=" + allocationTime + ", startTime=" + launchTime + ", containerId=" + containerId + ", nodeId=" + nodeId @@ -136,12 +141,16 @@ public class TaskAttemptStartedEvent implements HistoryEvent { return launchTime; } - public long getScheduledTime() { - return scheduledTime; + public long getCreationTime() { + return creationTime; } - public TezTaskAttemptID getSchedulingCausalTA() { - return schedulingCausalTA; + public long getAllocationTime() { + return allocationTime; + } + + public TezTaskAttemptID getCreationCausalTA() { + return creationCausalTA; } public ContainerId getContainerId() { http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 528da10..b32b324 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -577,9 +577,10 @@ public class HistoryEventJsonConversion { JSONObject otherInfo = new JSONObject(); otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); - otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime()); - if (event.getSchedulingCausalTA() != null) { - otherInfo.put(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT, event.getSchedulingCausalTA().toString()); + otherInfo.put(ATSConstants.CREATION_TIME, event.getCreationTime()); + otherInfo.put(ATSConstants.ALLOCATION_TIME, event.getAllocationTime()); + if (event.getCreationCausalTA() != null) { + otherInfo.put(ATSConstants.CREATION_CAUSAL_ATTEMPT, event.getCreationCausalTA().toString()); } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/proto/HistoryEvents.proto ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto index ffb382e..e268e0d 100644 --- a/tez-dag/src/main/proto/HistoryEvents.proto +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -164,8 +164,9 @@ message TaskAttemptStartedProto { optional int64 start_time = 2; optional string container_id = 3; optional string node_id = 4; - optional int64 scheduled_time = 5; - optional string scheduling_causal_t_a = 6; + optional int64 creation_time = 5; + optional string creation_causal_t_a = 6; + optional int64 allocation_time = 7; } message TaskAttemptFinishedProto { http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java index 920109b..4a797e0 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java @@ -19,7 +19,6 @@ package org.apache.tez.dag.app.dag.impl; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -71,7 +70,9 @@ public class TestTaskAttemptRecovery { private TaskAttemptImpl ta; private EventHandler mockEventHandler; - private long startTime = System.currentTimeMillis(); + private long creationTime = System.currentTimeMillis(); + private long allocationTime = creationTime + 5000; + private long startTime = allocationTime + 5000; private long finishTime = startTime + 5000; private TezTaskAttemptID taId; @@ -153,9 +154,14 @@ public class TestTaskAttemptRecovery { } private void restoreFromTAStartEvent() { + TezTaskAttemptID causalId = TezTaskAttemptID.getInstance(taId.getTaskID(), taId.getId()+1); TaskAttemptState recoveredState = ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); + startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", creationTime, causalId, + allocationTime)); + assertEquals(causalId, ta.getCreationCausalAttempt()); + assertEquals(creationTime, ta.getCreationTime()); + assertEquals(allocationTime, ta.getAllocationTime()); assertEquals(startTime, ta.getLaunchTime()); assertEquals(TaskAttemptState.RUNNING, recoveredState); } http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java index 87e7498..1d22e06 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java @@ -215,7 +215,7 @@ public class TestTaskRecovery { long taStartTime = taskStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); + taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(0, task.getFinishedAttemptsCount()); assertEquals(taskScheduledTime, task.scheduledTime); @@ -721,7 +721,7 @@ public class TestTaskRecovery { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); TaskState recoveredState = task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); + taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); @@ -774,7 +774,7 @@ public class TestTaskRecovery { for (int i = 0; i < maxFailedAttempts; ++i) { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); + mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, 0, TaskAttemptState.KILLED, null, "", null, 0, null)); } @@ -804,7 +804,7 @@ public class TestTaskRecovery { for (int i = 0; i < maxFailedAttempts; ++i) { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); + mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, 0, TaskAttemptState.FAILED, null, "", null, 0, null)); } @@ -834,7 +834,7 @@ public class TestTaskRecovery { for (int i = 0; i < maxFailedAttempts - 1; ++i) { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); + mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, 0, TaskAttemptState.FAILED, null, "", null, 0, null)); } @@ -844,7 +844,7 @@ public class TestTaskRecovery { TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId()); TaskState recoveredState = task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId, - vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null)); + vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index fafbba6..f9a1c5e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -108,11 +108,14 @@ public class TestAMContainer { assertNull(wc.amContainer.getCurrentTaskAttempt()); // Assign task. + long currTime = wc.appContext.getClock().getTime(); wc.assignTaskAttempt(wc.taskAttemptID); wc.verifyState(AMContainerState.LAUNCHING); wc.verifyNoOutgoingEvents(); assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt()); - + assertTrue(wc.amContainer.getCurrentTaskAttemptAllocationTime() > 0); + assertTrue(wc.amContainer.getCurrentTaskAttemptAllocationTime() >= currTime); + // Container Launched wc.containerLaunched(); wc.verifyState(AMContainerState.RUNNING); http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index a32cc27..3507d99 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -480,7 +480,7 @@ public class TestHistoryEventsProtoConversion { ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance( "host1", 19999), "inProgress", "Completed", "nodeHttpAddress", 1024, TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0) + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024 ); TaskAttemptStartedEvent deserializedEvent = (TaskAttemptStartedEvent) testProtoConversion(event); @@ -492,10 +492,12 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getNodeId()); Assert.assertEquals(event.getStartTime(), deserializedEvent.getStartTime()); - Assert.assertEquals(event.getScheduledTime(), - deserializedEvent.getScheduledTime()); - Assert.assertEquals(event.getSchedulingCausalTA(), - deserializedEvent.getSchedulingCausalTA()); + Assert.assertEquals(event.getCreationTime(), + deserializedEvent.getCreationTime()); + Assert.assertEquals(event.getAllocationTime(), + deserializedEvent.getAllocationTime()); + Assert.assertEquals(event.getCreationCausalTA(), + deserializedEvent.getCreationCausalTA()); logEvents(event, deserializedEvent); } http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index ec1603e..9c11dc7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -160,7 +160,7 @@ public class TestHistoryEventJsonConversion { break; case TASK_ATTEMPT_STARTED: event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId, - nodeId, null, null, "nodeHttpAddress", 0, null); + nodeId, null, null, "nodeHttpAddress", 0, null, 0); break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java index 916df95..ba676a2 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java @@ -43,14 +43,16 @@ public class TaskAttemptInfo extends BaseInfo { private final long endTime; private final String diagnostics; - private final long scheduledTime; + private final long creationTime; + private final long allocationTime; private final String containerId; private final String nodeId; private final String status; private final String logUrl; - private final String schedulingCausalTA; + private final String creationCausalTA; private final long lastDataEventTime; private final String lastDataEventSourceTA; + private final String terminationCause; private TaskInfo taskInfo; @@ -70,10 +72,10 @@ public class TaskAttemptInfo extends BaseInfo { startTime = otherInfoNode.optLong(Constants.START_TIME); endTime = otherInfoNode.optLong(Constants.FINISH_TIME); diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS); - scheduledTime = otherInfoNode.optLong(Constants.SCHEDULED_TIME); - schedulingCausalTA = StringInterner.weakIntern( - otherInfoNode.optString(Constants.SCHEDULING_CAUSAL_ATTEMPT)); - + creationTime = otherInfoNode.optLong(Constants.CREATION_TIME); + creationCausalTA = StringInterner.weakIntern( + otherInfoNode.optString(Constants.CREATION_CAUSAL_ATTEMPT)); + allocationTime = otherInfoNode.optLong(Constants.ALLOCATION_TIME); containerId = StringInterner.weakIntern(otherInfoNode.optString(Constants.CONTAINER_ID)); String id = otherInfoNode.optString(Constants.NODE_ID); nodeId = StringInterner.weakIntern((id != null) ? (id.split(":")[0]) : ""); @@ -84,6 +86,8 @@ public class TaskAttemptInfo extends BaseInfo { lastDataEventTime = otherInfoNode.optLong(ATSConstants.LAST_DATA_EVENT_TIME); lastDataEventSourceTA = StringInterner.weakIntern( otherInfoNode.optString(ATSConstants.LAST_DATA_EVENT_SOURCE_TA)); + terminationCause = StringInterner + .weakIntern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM)); } void setTaskInfo(TaskInfo taskInfo) { @@ -110,8 +114,8 @@ public class TaskAttemptInfo extends BaseInfo { return endTime; } - public final long getScheduledTime() { - return scheduledTime; + public final long getCreationTime() { + return creationTime; } public final long getLastDataEventTime() { @@ -126,19 +130,26 @@ public class TaskAttemptInfo extends BaseInfo { return getFinishTimeInterval() - getStartTimeInterval(); } - public final long getScheduledTimeInterval() { - return scheduledTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime()); + public final long getCreationTimeInterval() { + return creationTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime()); } - public final String getSchedulingCausalTA() { - return schedulingCausalTA; + public final String getCreationCausalTA() { + return creationCausalTA; } + public final long getAllocationTime() { + return allocationTime; + } @Override public final String getDiagnostics() { return diagnostics; } + + public final String getTerminationCause() { + return terminationCause; + } public static TaskAttemptInfo create(JSONObject taskInfoObject) throws JSONException { return new TaskAttemptInfo(taskInfoObject); @@ -254,7 +265,7 @@ public class TaskAttemptInfo extends BaseInfo { StringBuilder sb = new StringBuilder(); sb.append("["); sb.append("taskAttemptId=").append(getTaskAttemptId()).append(", "); - sb.append("scheduledTime=").append(getScheduledTimeInterval()).append(", "); + sb.append("creationTime=").append(getCreationTimeInterval()).append(", "); sb.append("startTime=").append(getStartTimeInterval()).append(", "); sb.append("finishTime=").append(getFinishTimeInterval()).append(", "); sb.append("timeTaken=").append(getTimeTaken()).append(", "); http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java index c89acb2..2b797a5 100644 --- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java +++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java @@ -77,6 +77,7 @@ import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.apache.tez.tests.MiniTezClusterWithTimeline; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -297,8 +298,9 @@ public class TestHistoryParser { } } for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { - assertTrue(attemptInfo.getStartTimeInterval() > 0); - assertTrue(attemptInfo.getScheduledTimeInterval() > 0); + assertTrue(attemptInfo.getCreationTime() > 0); + assertTrue(attemptInfo.getAllocationTime() > 0); + assertTrue(attemptInfo.getStartTime() > 0); } } assertTrue(vertexInfo.getLastTaskToFinish() != null); @@ -389,13 +391,14 @@ public class TestHistoryParser { "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above for (TaskInfo taskInfo : summationVertex.getTasks()) { - String lastAttemptId = null; + TaskAttemptInfo lastAttempt = null; for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { - if (lastAttemptId != null) { + if (lastAttempt != null) { // failed attempt should be causal TA of next attempt - assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA())); + assertTrue(lastAttempt.getTaskAttemptId().equals(attemptInfo.getCreationCausalTA())); + assertTrue(lastAttempt.getTerminationCause() != null); } - lastAttemptId = attemptInfo.getTaskAttemptId(); + lastAttempt = attemptInfo; } } @@ -769,6 +772,8 @@ public class TestHistoryParser { .equals(TaskAttemptState.SUCCEEDED)) { assertTrue(attemptInfo.getStartTimeInterval() > 0); assertTrue(attemptInfo.getFinishTimeInterval() > 0); + assertTrue(attemptInfo.getCreationTime() > 0); + assertTrue(attemptInfo.getAllocationTime() > 0); assertTrue(attemptInfo.getStartTime() > 0); assertTrue(attemptInfo.getFinishTime() > 0); assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime()); http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index eaed115..b979402 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -471,10 +471,11 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name()); - atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime()); - if (event.getSchedulingCausalTA() != null) { - atsEntity.addOtherInfo(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT, - event.getSchedulingCausalTA().toString()); + atsEntity.addOtherInfo(ATSConstants.CREATION_TIME, event.getCreationTime()); + atsEntity.addOtherInfo(ATSConstants.ALLOCATION_TIME, event.getAllocationTime()); + if (event.getCreationCausalTA() != null) { + atsEntity.addOtherInfo(ATSConstants.CREATION_CAUSAL_ATTEMPT, + event.getCreationCausalTA().toString()); } return atsEntity; http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index 838d9d6..75828c3 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -165,7 +165,7 @@ public class TestHistoryEventTimelineConversion { break; case TASK_ATTEMPT_STARTED: event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId, - nodeId, null, null, "nodeHttpAddress", 0, null); + nodeId, null, null, "nodeHttpAddress", 0, null, 0); break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), @@ -727,10 +727,11 @@ public class TestHistoryEventTimelineConversion { @Test(timeout = 5000) public void testConvertTaskAttemptStartedEvent() { long startTime = random.nextLong(); - long scheduleTime = 1024; + long creationTime = 1024; + long allocationTime = 1024; TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress", - scheduleTime, tezTaskAttemptID); + creationTime, tezTaskAttemptID, allocationTime); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); @@ -776,8 +777,9 @@ public class TestHistoryEventTimelineConversion { Assert.assertTrue(TaskAttemptState.RUNNING.name() .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS))); Assert.assertEquals(tezTaskAttemptID.toString(), - timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT)); - Assert.assertEquals(scheduleTime, timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)); + timelineEntity.getOtherInfo().get(ATSConstants.CREATION_CAUSAL_ATTEMPT)); + Assert.assertEquals(creationTime, timelineEntity.getOtherInfo().get(ATSConstants.CREATION_TIME)); + Assert.assertEquals(allocationTime, timelineEntity.getOtherInfo().get(ATSConstants.ALLOCATION_TIME)); } @Test(timeout = 5000)
