Repository: tez Updated Branches: refs/heads/master eda9a47ea -> e8269c270
TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e8269c27 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e8269c27 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e8269c27 Branch: refs/heads/master Commit: e8269c27077c1709cd614d97338b5ad8d035f507 Parents: eda9a47 Author: Jonathan Eagles <[email protected]> Authored: Tue Mar 8 17:26:23 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Tue Mar 8 17:27:40 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../TaskAttemptEventContainerTerminated.java | 11 ++- ...AttemptEventContainerTerminatedBySystem.java | 12 ++- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 89 +++++++++++++++----- .../dag/app/rm/container/AMContainerImpl.java | 4 +- .../events/TaskAttemptFinishedEvent.java | 63 +++++++++++++- .../impl/HistoryEventJsonConversion.java | 16 ++++ tez-dag/src/main/proto/HistoryEvents.proto | 3 + .../apache/tez/dag/app/TestRecoveryParser.java | 2 +- .../tez/dag/app/dag/impl/TestDAGRecovery.java | 14 +-- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 8 +- .../TestHistoryEventsProtoConversion.java | 24 +++++- .../impl/TestHistoryEventJsonConversion.java | 3 +- .../ats/HistoryEventTimelineConversion.java | 16 ++++ .../ats/TestHistoryEventTimelineConversion.java | 12 ++- 15 files changed, 234 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 02232de..d01c732 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch TEZ-3140. Reduce AM memory usage during serialization TEZ-2756. MergeManager close should not try merging files on close if invoked after a shuffle exception. TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application. @@ -399,6 +400,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch TEZ-3140. Reduce AM memory usage during serialization TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application. TEZ-3115. Shuffle string handling adds significant memory overhead http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java index 5dd0141..3db2ffc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java @@ -17,22 +17,29 @@ package org.apache.tez.dag.app.dag.event; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; public class TaskAttemptEventContainerTerminated extends TaskAttemptEvent implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent { + private final ContainerId containerId; private final String message; private final TaskAttemptTerminationCause errorCause; - public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String message, - TaskAttemptTerminationCause errCause) { + public TaskAttemptEventContainerTerminated(ContainerId containerId, TezTaskAttemptID id, + String message, TaskAttemptTerminationCause errCause) { super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED); + this.containerId = containerId; this.message = message; this.errorCause = errCause; } + public ContainerId getContainerId() { + return containerId; + } + @Override public String getDiagnosticInfo() { return message; http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java index a3c57e4..4efbf88 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java @@ -18,21 +18,29 @@ package org.apache.tez.dag.app.dag.event; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; public class TaskAttemptEventContainerTerminatedBySystem extends TaskAttemptEvent implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent { + private final ContainerId containerId; private final String diagnostics; private final TaskAttemptTerminationCause errorCause; - public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id, String diagnostics, - TaskAttemptTerminationCause errorCause) { + + public TaskAttemptEventContainerTerminatedBySystem(ContainerId containerId, TezTaskAttemptID id, + String diagnostics, TaskAttemptTerminationCause errorCause) { super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); + this.containerId = containerId; this.diagnostics = diagnostics; this.errorCause = errorCause; } + public ContainerId getContainerId() { + return containerId; + } + @Override public String getDiagnosticInfo() { return diagnostics; http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 0affff2..1598f2d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -78,6 +78,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem; import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent; @@ -1037,23 +1038,8 @@ public class TaskAttemptImpl implements TaskAttempt, protected void logJobHistoryAttemptStarted() { Preconditions.checkArgument(recoveryData == null); - final String containerIdStr = containerId.toString(); - String inProgressLogsUrl = nodeHttpAddress - + "/" + "node/containerlogs" - + "/" + containerIdStr - + "/" + this.appContext.getUser(); - String completedLogsUrl = ""; - if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) - && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) { - String contextStr = "v_" + getVertex().getName() - + "_" + this.attemptId.toString(); - completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) - + "/" + containerNodeId.toString() - + "/" + containerIdStr - + "/" + contextStr - + "/" + this.appContext.getUser(); - } + String inProgressLogsUrl = getInProgressLogsUrl(); + String completedLogsUrl = getCompletedLogsUrl(); TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent( attemptId, getVertex().getName(), launchTime, containerId, containerNodeId, @@ -1072,7 +1058,8 @@ public class TaskAttemptImpl implements TaskAttempt, attemptId, getVertex().getName(), getLaunchTime(), getFinishTime(), TaskAttemptState.SUCCEEDED, null, "", getCounters(), lastDataEvents, taGeneratedEvents, - creationTime, creationCausalTA, allocationTime); + creationTime, creationCausalTA, allocationTime, + null, null, null, null, null); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); @@ -1084,8 +1071,16 @@ public class TaskAttemptImpl implements TaskAttempt, || recoveryData.getTaskAttemptFinishedEvent() == null, "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent"); long finishTime = getFinishTime(); + ContainerId unsuccessfulContainerId = null; + NodeId unsuccessfulContainerNodeId = null; + String inProgressLogsUrl = null; + String completedLogsUrl = null; if (finishTime <= 0) { finishTime = clock.getTime(); // comes here in case it was terminated before launch + unsuccessfulContainerId = containerId; + unsuccessfulContainerNodeId = containerNodeId; + inProgressLogsUrl = getInProgressLogsUrl(); + completedLogsUrl = getCompletedLogsUrl(); } TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( attemptId, getVertex().getName(), getLaunchTime(), @@ -1093,12 +1088,44 @@ public class TaskAttemptImpl implements TaskAttempt, terminationCause, StringUtils.join( getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents, - taGeneratedEvents, creationTime, creationCausalTA, allocationTime); + taGeneratedEvents, creationTime, creationCausalTA, allocationTime, + unsuccessfulContainerId, unsuccessfulContainerNodeId, inProgressLogsUrl, completedLogsUrl, nodeHttpAddress); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); } + private String getInProgressLogsUrl() { + String inProgressLogsUrl = null; + if (containerId != null && nodeHttpAddress != null) { + final String containerIdStr = containerId.toString(); + inProgressLogsUrl = nodeHttpAddress + + "/" + "node/containerlogs" + + "/" + containerIdStr + + "/" + this.appContext.getUser(); + } + return inProgressLogsUrl; + } + + private String getCompletedLogsUrl() { + String completedLogsUrl = null; + if (containerId != null && containerNodeId != null && nodeHttpAddress != null) { + final String containerIdStr = containerId.toString(); + if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) + && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) { + String contextStr = "v_" + getVertex().getName() + + "_" + this.attemptId.toString(); + completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) + + "/" + containerNodeId.toString() + + "/" + containerIdStr + + "/" + contextStr + + "/" + this.appContext.getUser(); + } + } + return completedLogsUrl; + } + ////////////////////////////////////////////////////////////////////////////// // Start of Transition Classes // ////////////////////////////////////////////////////////////////////////////// @@ -1268,6 +1295,30 @@ public class TaskAttemptImpl implements TaskAttempt, + ", eventClass=" + event.getClass().getName()); } + if (event instanceof TaskAttemptEventContainerTerminated) { + TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated) event; + AMContainer amContainer = ta.appContext.getAllContainers().get(tEvent.getContainerId()); + Container container = amContainer.getContainer(); + + ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); + ta.container = container; + ta.containerId = tEvent.getContainerId(); + ta.containerNodeId = container.getNodeId(); + ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress()); + } + + if (event instanceof TaskAttemptEventContainerTerminatedBySystem) { + TaskAttemptEventContainerTerminatedBySystem tEvent = (TaskAttemptEventContainerTerminatedBySystem) event; + AMContainer amContainer = ta.appContext.getAllContainers().get(tEvent.getContainerId()); + Container container = amContainer.getContainer(); + + ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); + ta.container = container; + ta.containerId = tEvent.getContainerId(); + ta.containerNodeId = container.getNodeId(); + ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress()); + } + if (ta.recoveryData == null || ta.recoveryData.getTaskAttemptFinishedEvent() == null) { ta.setFinishTime(); http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index e4302aa..94c8fe0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -1100,12 +1100,12 @@ public class AMContainerImpl implements AMContainer { protected void sendTerminatedToTaskAttempt( TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errCause) { - sendEvent(new TaskAttemptEventContainerTerminated(taId, message, errCause)); + sendEvent(new TaskAttemptEventContainerTerminated(containerId, taId, message, errCause)); } protected void sendContainerTerminatedBySystemToTaskAttempt( TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errorCause) { - sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message, errorCause)); + sendEvent(new TaskAttemptEventContainerTerminatedBySystem(containerId, taId, message, errorCause)); } protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId, http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java index 21b8719..8e31a25 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java @@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; @@ -59,7 +62,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { private TaskAttemptTerminationCause error; private List<DataEventDependencyInfo> dataEvents; private List<TezEvent> taGeneratedEvents; - + private ContainerId containerId; + private NodeId nodeId; + private String inProgressLogsUrl; + private String completedLogsUrl; + private String nodeHttpAddress; + public TaskAttemptFinishedEvent(TezTaskAttemptID taId, String vertexName, long startTime, @@ -71,7 +79,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { List<TezEvent> taGeneratedEvents, long creationTime, TezTaskAttemptID creationCausalTA, - long allocationTime) { + long allocationTime, + ContainerId containerId, + NodeId nodeId, + String inProgressLogsUrl, + String completedLogsUrl, + String nodeHttpAddress) { this.taskAttemptId = taId; this.vertexName = vertexName; this.creationCausalTA = creationCausalTA; @@ -85,6 +98,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.error = error; this.dataEvents = dataEvents; this.taGeneratedEvents = taGeneratedEvents; + this.containerId = containerId; + this.nodeId = nodeId; + this.inProgressLogsUrl = inProgressLogsUrl; + this.completedLogsUrl = completedLogsUrl; + this.nodeHttpAddress = nodeHttpAddress; } public TaskAttemptFinishedEvent() { @@ -140,6 +158,15 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { builder.addTaGeneratedEvents(TezEventUtils.toProto(event)); } } + if (containerId != null) { + builder.setContainerId(containerId.toString()); + } + if (nodeId != null) { + builder.setNodeId(nodeId.toString()); + } + if (nodeHttpAddress != null) { + builder.setNodeHttpAddress(nodeHttpAddress); + } return builder.build(); } @@ -175,6 +202,15 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.taGeneratedEvents.add(TezEventUtils.fromProto(eventProto)); } } + if (proto.hasContainerId()) { + this.containerId = ConverterUtils.toContainerId(proto.getContainerId()); + } + if (proto.hasNodeId()) { + this.nodeId = ConverterUtils.toNodeId(proto.getNodeId()); + } + if (proto.hasNodeHttpAddress()) { + this.nodeHttpAddress = proto.getNodeHttpAddress(); + } } @Override @@ -210,6 +246,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { + ", status=" + state.name() + ", errorEnum=" + (error != null ? error.name() : "") + ", diagnostics=" + diagnostics + + ", containerId=" + (containerId != null ? containerId.toString() : "") + + ", nodeId=" + (nodeId != null ? nodeId.toString() : "") + + ", nodeHttpAddress=" + (nodeHttpAddress != null ? nodeHttpAddress : "") + counterStr; } @@ -256,4 +295,24 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { public List<TezEvent> getTAGeneratedEvents() { return taGeneratedEvents; } + + public ContainerId getContainerId() { + return containerId; + } + + public NodeId getNodeId() { + return nodeId; + } + + public String getInProgressLogsUrl() { + return inProgressLogsUrl; + } + + public String getCompletedLogsUrl() { + return completedLogsUrl; + } + + public String getNodeHttpAddress() { + return nodeHttpAddress; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index c4e7e5b..9bca440 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -555,6 +555,22 @@ public class HistoryEventJsonConversion { otherInfo.put(ATSConstants.LAST_DATA_EVENTS, DAGUtils.convertDataEventDependencyInfoToJSON(event.getDataEvents())); } + if (event.getNodeId() != null) { + otherInfo.put(ATSConstants.NODE_ID, event.getNodeId().toString()); + } + if (event.getContainerId() != null) { + otherInfo.put(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + } + if (event.getInProgressLogsUrl() != null) { + otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); + } + if (event.getCompletedLogsUrl() != null) { + otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + } + if (event.getNodeHttpAddress() != null) { + otherInfo.put(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); + } + jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); return jsonObject; http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/proto/HistoryEvents.proto ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto index b9e4507..f3aeed4 100644 --- a/tez-dag/src/main/proto/HistoryEvents.proto +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -190,6 +190,9 @@ message TaskAttemptFinishedProto { optional string error_enum = 10; repeated DataEventDependencyInfoProto data_events = 11; repeated TezEventProto ta_generated_events = 12; + optional string container_id = 13; + optional string node_id = 14; + optional string node_http_address = 15; } message EventMetaDataProto { http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java index d8b620a..12e75a7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java @@ -703,7 +703,7 @@ public class TestRecoveryParser { TaskAttemptFinishedEvent ta0t2v2FinishedEvent = new TaskAttemptFinishedEvent( ta0t2v2Id, "v1", 500L, 600L, TaskAttemptState.SUCCEEDED, null, "", null, - null, null, 0L, null, 0L); + null, null, 0L, null, 0L, null, null, null, null, null); rService.handle(new DAGHistoryEvent(dagID, ta0t2v2FinishedEvent)); rService.stop(); http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java index 6be682d..0a2613c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java @@ -883,7 +883,7 @@ public class TestDAGRecovery { TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", 0L, 0L, TaskAttemptState.SUCCEEDED, null, "", null, - null, taGeneratedEvents, 0L, null, 0L); + null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent); Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap = new HashMap<TezTaskAttemptID, TaskAttemptRecoveryData>(); @@ -941,7 +941,7 @@ public class TestDAGRecovery { TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, "", null, - null, null, 0L, null, 0L); + null, null, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent); doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id); @@ -970,7 +970,7 @@ public class TestDAGRecovery { TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, - null, null, 0L, null, 0L); + null, null, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent); doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id); @@ -1030,7 +1030,7 @@ public class TestDAGRecovery { TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, TaskAttemptState.SUCCEEDED, null, "", null, - null, taGeneratedEvents, 0L, null, 0L); + null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent); doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id); @@ -1068,7 +1068,7 @@ public class TestDAGRecovery { TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v2Id, "vertex2", ta1LaunchTime, ta1FinishedTime, TaskAttemptState.SUCCEEDED, null, "", null, - null, taGeneratedEvents, 0L, null, 0L); + null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent); doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v2Id); Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap = @@ -1119,7 +1119,7 @@ public class TestDAGRecovery { TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, TaskAttemptState.FAILED, TaskAttemptTerminationCause.INPUT_READ_ERROR, "", null, - null, null, 0L, null, 0L); + null, null, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent); doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id); @@ -1150,7 +1150,7 @@ public class TestDAGRecovery { TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", ta1FinishedTime, ta1FinishedTime, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, - null, null, 0L, null, 0L); + null, null, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent); doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id); http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 3bb688e..c5dfbc1 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -420,7 +420,7 @@ public class TestTaskAttempt { arg.getAllValues().subList(expectedEventsAtRunning, expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); - taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, + taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED)); // verify unregister is not invoked again verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); @@ -487,7 +487,7 @@ public class TestTaskAttempt { TaskAttemptState.RUNNING); verify(mockHeartbeatHandler).register(taskAttemptID); - taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated", + taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED)); assertFalse( "InternalError occurred trying to handle TA_CONTAINER_TERMINATED", @@ -576,7 +576,7 @@ public class TestTaskAttempt { arg.getAllValues().subList(expectedEventsAtRunning, expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); - taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, + taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED)); // verify unregister is not invoked again verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); @@ -747,7 +747,7 @@ public class TestTaskAttempt { assertEquals("0", taImpl.getDiagnostics().get(0)); assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause()); - taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "1", + taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "1", TaskAttemptTerminationCause.CONTAINER_EXITED)); // verify unregister is not invoked again verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index d3bd7b8..38d9935 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -502,7 +502,11 @@ public class TestHistoryEventsProtoConversion { "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, null, null, null, null, null, 2048, TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024); + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024, + ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance( + "host1", 19999), "inProgress", "Completed", "nodeHttpAddress"); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -523,6 +527,12 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getState()); Assert.assertEquals(event.getCounters(), deserializedEvent.getCounters()); + Assert.assertEquals(event.getContainerId(), + deserializedEvent.getContainerId()); + Assert.assertEquals(event.getNodeId(), + deserializedEvent.getNodeId()); + Assert.assertEquals(event.getNodeHttpAddress(), + deserializedEvent.getNodeHttpAddress()); logEvents(event, deserializedEvent); } { @@ -537,7 +547,11 @@ public class TestHistoryEventsProtoConversion { TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events, - null, 0, null, 0); + null, 0, null, 0, + ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance( + "host1", 19999), "inProgress", "Completed", "nodeHttpAddress"); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -550,6 +564,12 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getState()); Assert.assertEquals(event.getCounters(), deserializedEvent.getCounters()); + Assert.assertEquals(event.getContainerId(), + deserializedEvent.getContainerId()); + Assert.assertEquals(event.getNodeId(), + deserializedEvent.getNodeId()); + Assert.assertEquals(event.getNodeHttpAddress(), + deserializedEvent.getNodeHttpAddress()); Assert.assertEquals(event.getTaskAttemptError(), deserializedEvent.getTaskAttemptError()); Assert.assertEquals(events.size(), event.getDataEvents().size()); http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index b285196..ea683f7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -165,7 +165,8 @@ public class TestHistoryEventJsonConversion { case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, - null, null, null, null, 0, null, 0); + null, null, null, null, 0, null, 0, + containerId, nodeId, null, null, "nodeHttpAddress"); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index d6b518b..26d4d98 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -468,6 +468,22 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENTS, DAGUtils.convertDataEventDependecyInfoToATS(event.getDataEvents())); } + if (event.getNodeId() != null) { + atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString()); + } + if (event.getContainerId() != null) { + atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + } + if (event.getInProgressLogsUrl() != null) { + atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); + } + if (event.getCompletedLogsUrl() != null) { + atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + } + if (event.getNodeHttpAddress() != null) { + atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); + } + return atsEntity; } http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index 49b6f9f..c5badaa 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -184,7 +184,8 @@ public class TestHistoryEventTimelineConversion { case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, - null, null, null, null, 0, null, 0); + null, null, null, null, 0, null, 0, + containerId, nodeId, null, null, "nodeHttpAddress"); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), @@ -519,7 +520,7 @@ public class TestHistoryEventTimelineConversion { TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName, startTime, finishTime, state, error, diagnostics, counters, events, null, creationTime, - tezTaskAttemptID, allocationTime); + tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress"); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); @@ -542,7 +543,7 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(finishTime, evt.getTimestamp()); final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); - Assert.assertEquals(11, otherInfo.size()); + Assert.assertEquals(16, otherInfo.size()); Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getOtherInfo().get(ATSConstants.CREATION_CAUSAL_ATTEMPT)); Assert.assertEquals(creationTime, timelineEntity.getOtherInfo().get(ATSConstants.CREATION_TIME)); @@ -559,6 +560,11 @@ public class TestHistoryEventTimelineConversion { Map<String, Object> obj3 = (Map<String, Object>) obj2.get(0); Assert.assertEquals(events.get(0).getTimestamp(), obj3.get(ATSConstants.TIMESTAMP)); Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS)); + Assert.assertEquals("inProgressURL", otherInfo.get(ATSConstants.IN_PROGRESS_LOGS_URL)); + Assert.assertEquals("logsURL", otherInfo.get(ATSConstants.COMPLETED_LOGS_URL)); + Assert.assertEquals(nodeId.toString(), otherInfo.get(ATSConstants.NODE_ID)); + Assert.assertEquals(containerId.toString(), otherInfo.get(ATSConstants.CONTAINER_ID)); + Assert.assertEquals("nodeHttpAddress", otherInfo.get(ATSConstants.NODE_HTTP_ADDRESS)); } @SuppressWarnings("unchecked")
