TEZ-2259. Push additional data to Timeline for Recovery for better consumption in UI. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b08ca37e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b08ca37e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b08ca37e Branch: refs/heads/TEZ-2003 Commit: b08ca37e31595dab4941dc7ed0736464d0223bc9 Parents: 21d4e2d Author: Hitesh Shah <[email protected]> Authored: Mon Apr 27 13:50:28 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Mon Apr 27 13:50:28 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/ATSConstants.java | 4 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 22 ++- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 5 +- .../tez/dag/history/HistoryEventType.java | 3 +- .../dag/history/events/DAGFinishedEvent.java | 15 ++- .../dag/history/events/DAGRecoveredEvent.java | 124 +++++++++++++++++ .../impl/HistoryEventJsonConversion.java | 42 ++++++ .../apache/tez/dag/app/TestRecoveryParser.java | 4 +- .../tez/dag/app/dag/impl/TestDAGRecovery.java | 2 +- .../TestHistoryEventsProtoConversion.java | 23 +++- .../impl/TestHistoryEventJsonConversion.java | 7 +- .../ats/HistoryEventTimelineConversion.java | 35 +++++ .../ats/TestHistoryEventTimelineConversion.java | 133 +++++++++++++++---- 14 files changed, 376 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e78ee7e..a5c4a57 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -146,6 +146,7 @@ Release 0.6.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2259. Push additional data to Timeline for Recovery for better consumption in UI. TEZ-2365. Update tez-ui war's license/notice to reflect OFL license correctly. TEZ-2329. UI Query on final dag status performance improvement TEZ-2287. Deprecate VertexManagerPluginContext.getTaskContainer(). http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index 26170f4..fd82e20 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -35,12 +35,12 @@ public class ATSConstants { public static final String EVENT_INFO = "eventinfo"; public static final String RELATED_ENTITIES = "relatedEntities"; public static final String PRIMARY_FILTERS = "primaryfilters"; - public static final String SECONDARY_FILTERS = "secondaryfilters"; public static final String OTHER_INFO = "otherinfo"; /* Section for related entities */ public static final String APPLICATION_ID = "applicationId"; public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId"; + public static final String COMPLETION_APPLICATION_ATTEMPT_ID = "completionApplicationAttemptId"; public static final String CONTAINER_ID = "containerId"; public static final String NODE_ID = "nodeId"; public static final String NODE_HTTP_ADDRESS = "nodeHttpAddress"; @@ -52,6 +52,8 @@ public class ATSConstants { /* Tez-specific info */ public static final String DAG_PLAN = "dagPlan"; public static final String DAG_NAME = "dagName"; + public static final String DAG_STATE = "dagState"; + public static final String RECOVERY_FAILURE_REASON = "recoveryFailureReason"; public static final String VERTEX_NAME = "vertexName"; public static final String VERTEX_NAME_ID_MAPPING = "vertexNameIdMapping"; public static final String SCHEDULED_TIME = "scheduledTime"; http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 8a914f6..90935ac 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -58,10 +58,9 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup; +import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -162,6 +161,8 @@ import org.apache.tez.dag.utils.Graph; import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.dag.utils.Simple2LevelVersionComparator; import org.codehaus.jettison.json.JSONException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; @@ -1736,18 +1737,35 @@ public class DAGAppMaster extends AbstractService { DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(), DAGState.FAILED, classpathUrls); + DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID, + recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(), + recoveredDAGData.recoveredDAG.getUserName(), + this.clock.getTime(), DAGState.FAILED, recoveredDAGData.reason); + this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(), + dagRecoveredEvent)); dagEventDispatcher.handle(recoverDAGEvent); this.state = DAGAppMasterState.RUNNING; } else { DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.dagState, classpathUrls); + DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID, + recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(), + recoveredDAGData.recoveredDAG.getUserName(), this.clock.getTime(), + recoveredDAGData.dagState, null); + this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(), + dagRecoveredEvent)); dagEventDispatcher.handle(recoverDAGEvent); this.state = DAGAppMasterState.RUNNING; } } else { LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID()); _updateLoggers(recoveredDAGData.recoveredDAG, ""); + DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID, + recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(), + recoveredDAGData.recoveredDAG.getUserName(), this.clock.getTime()); + this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(), + dagRecoveredEvent)); DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent( recoveredDAGData.recoveredDAG.getID(), classpathUrls); dagEventDispatcher.handle(recoverDAGEvent); http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 5540285..c47a0d7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -1127,7 +1127,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, Map<String, Integer> taskStats = constructTaskStats(getDAGProgress()); DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime, finishTime, DAGState.SUCCEEDED, "", getAllCounters(), - this.userName, this.dagName, taskStats); + this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId()); this.appContext.getHistoryHandler().handleCriticalEvent( new DAGHistoryEvent(dagId, finishEvt)); } @@ -1151,7 +1151,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime, clock.getTime(), state, StringUtils.join(getDiagnostics(), LINE_SEPARATOR), - getAllCounters(), this.userName, this.dagName, taskStats); + getAllCounters(), this.userName, this.dagName, taskStats, + this.appContext.getApplicationAttemptId()); this.appContext.getHistoryHandler().handleCriticalEvent( new DAGHistoryEvent(dagId, finishEvt)); } http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java index 17df58f..6949d21 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java @@ -40,5 +40,6 @@ public enum HistoryEventType { DAG_COMMIT_STARTED, VERTEX_COMMIT_STARTED, VERTEX_GROUP_COMMIT_STARTED, - VERTEX_GROUP_COMMIT_FINISHED + VERTEX_GROUP_COMMIT_FINISHED, + DAG_RECOVERED } http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java index b10a876..2f173a9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java @@ -23,8 +23,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.app.dag.DAGState; @@ -41,8 +40,6 @@ import com.google.protobuf.ByteString; public class DAGFinishedEvent implements HistoryEvent, SummaryEvent { - private static final Logger LOG = LoggerFactory.getLogger(DAGFinishedEvent.class); - private TezDAGID dagID; private long startTime; private long finishTime; @@ -53,13 +50,16 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent { private String dagName; Map<String, Integer> dagTaskStats; + private ApplicationAttemptId applicationAttemptId; + public DAGFinishedEvent() { } public DAGFinishedEvent(TezDAGID dagId, long startTime, long finishTime, DAGState state, String diagnostics, TezCounters counters, - String user, String dagName, Map<String, Integer> dagTaskStats) { + String user, String dagName, Map<String, Integer> dagTaskStats, + ApplicationAttemptId applicationAttemptId) { this.dagID = dagId; this.startTime = startTime; this.finishTime = finishTime; @@ -69,6 +69,7 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent { this.user = user; this.dagName = dagName; this.dagTaskStats = dagTaskStats; + this.applicationAttemptId = applicationAttemptId; } @Override @@ -202,4 +203,8 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent { return dagTaskStats; } + public ApplicationAttemptId getApplicationAttemptId() { + return applicationAttemptId; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java new file mode 100644 index 0000000..5b44de2 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.events; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.records.TezDAGID; + +public class DAGRecoveredEvent implements HistoryEvent { + + private final ApplicationAttemptId applicationAttemptId; + private final TezDAGID dagID; + private final long recoveredTime; + private final DAGState recoveredDagState; + private final String recoveryFailureReason; + private final String dagName; + private final String user; + + public DAGRecoveredEvent(ApplicationAttemptId applicationAttemptId, + TezDAGID dagId, String dagName, String user, + long recoveredTime, DAGState recoveredState, + String recoveryFailureReason) { + this.applicationAttemptId = applicationAttemptId; + this.dagID = dagId; + this.dagName = dagName; + this.user = user; + this.recoveredTime = recoveredTime; + this.recoveredDagState = recoveredState; + this.recoveryFailureReason = recoveryFailureReason; + } + + public DAGRecoveredEvent(ApplicationAttemptId applicationAttemptId, + TezDAGID dagId, String dagName, String user, long recoveredTime) { + this(applicationAttemptId, dagId, dagName, user, recoveredTime, null, null); + } + + @Override + public HistoryEventType getEventType() { + return HistoryEventType.DAG_RECOVERED; + } + + @Override + public boolean isRecoveryEvent() { + return false; + } + + @Override + public boolean isHistoryEvent() { + return true; + } + + @Override + public void toProtoStream(OutputStream outputStream) throws IOException { + throw new UnsupportedOperationException("Invalid operation for eventType " + + getEventType().name()); + } + + @Override + public void fromProtoStream(InputStream inputStream) throws IOException { + throw new UnsupportedOperationException("Invalid operation for eventType " + + getEventType().name()); + } + + public ApplicationAttemptId getApplicationAttemptId() { + return applicationAttemptId; + } + + public TezDAGID getDagID() { + return dagID; + } + + public long getRecoveredTime() { + return recoveredTime; + } + + public DAGState getRecoveredDagState() { + return recoveredDagState; + } + + public String getRecoveryFailureReason() { + return recoveryFailureReason; + } + + public String getDagName() { + return dagName; + } + + public String getUser() { + return user; + } + + @Override + public String toString() { + return "applicationAttemptId=" + + (applicationAttemptId != null ? applicationAttemptId.toString() : "null") + + ", dagId=" + (dagID != null ? dagID.toString() : "null") + + ", recoveredTime=" + recoveredTime + + ", recoveredState=" + (recoveredDagState != null ? recoveredDagState.name() : "null" ) + + ", recoveryFailureReason=" + recoveryFailureReason; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 22d95d8..07ce2f3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -33,6 +33,7 @@ import org.apache.tez.dag.history.events.ContainerLaunchedEvent; import org.apache.tez.dag.history.events.ContainerStoppedEvent; import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; @@ -110,6 +111,9 @@ public class HistoryEventJsonConversion { case VERTEX_PARALLELISM_UPDATED: jsonObject = convertVertexParallelismUpdatedEvent((VertexParallelismUpdatedEvent) historyEvent); break; + case DAG_RECOVERED: + jsonObject = convertDAGRecoveredEvent((DAGRecoveredEvent) historyEvent); + break; case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: case VERTEX_COMMIT_STARTED: case VERTEX_GROUP_COMMIT_STARTED: @@ -124,6 +128,42 @@ public class HistoryEventJsonConversion { return jsonObject; } + private static JSONObject convertDAGRecoveredEvent(DAGRecoveredEvent event) + throws JSONException { + JSONObject jsonObject = new JSONObject(); + jsonObject.put(ATSConstants.ENTITY, + event.getDagID().toString()); + jsonObject.put(ATSConstants.ENTITY_TYPE, + EntityTypes.TEZ_DAG_ID.name()); + + // Related Entities not needed as should have been done in + // dag submission event + + JSONArray events = new JSONArray(); + JSONObject recoverEvent = new JSONObject(); + recoverEvent.put(ATSConstants.TIMESTAMP, event.getRecoveredTime()); + recoverEvent.put(ATSConstants.EVENT_TYPE, + HistoryEventType.DAG_RECOVERED.name()); + + JSONObject recoverEventInfo = new JSONObject(); + recoverEventInfo.put(ATSConstants.APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); + if (event.getRecoveredDagState() != null) { + recoverEventInfo.put(ATSConstants.DAG_STATE, event.getRecoveredDagState().name()); + } + if (event.getRecoveryFailureReason() != null) { + recoverEventInfo.put(ATSConstants.RECOVERY_FAILURE_REASON, + event.getRecoveryFailureReason()); + } + + recoverEvent.put(ATSConstants.EVENT_INFO, recoverEventInfo); + events.put(recoverEvent); + + jsonObject.put(ATSConstants.EVENTS, events); + + return jsonObject; + } + private static JSONObject convertAppLaunchedEvent(AppLaunchedEvent event) throws JSONException { JSONObject jsonObject = new JSONObject(); jsonObject.put(ATSConstants.ENTITY, @@ -327,6 +367,8 @@ public class HistoryEventJsonConversion { otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); otherInfo.put(ATSConstants.COUNTERS, DAGUtils.convertCountersToJSON(event.getTezCounters())); + otherInfo.put(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); final Map<String, Integer> dagTaskStats = event.getDagTaskStats(); if (dagTaskStats != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java index a256244..4bb0615 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java @@ -169,6 +169,7 @@ public class TestRecoveryParser { // skipAllOtherEvents due to dag finished @Test (timeout = 5000) public void testSkipAllOtherEvents_2() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); TezDAGID dagID = TezDAGID.getInstance(appId, 1); AppContext appContext = mock(AppContext.class); @@ -188,7 +189,8 @@ public class TestRecoveryParser { rService.handle(new DAGHistoryEvent(dagID, new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null))); rService.handle(new DAGHistoryEvent(dagID, - new DAGFinishedEvent(dagID, 1L, 2L, DAGState.FAILED, "diag", null, "user", "dag1", null))); + new DAGFinishedEvent(dagID, 1L, 2L, DAGState.FAILED, "diag", null, "user", "dag1", null, + appAttemptId))); rService.handle(new DAGHistoryEvent(dagID, new DAGStartedEvent(dagID, 1L, "user", "dag1"))); rService.stop(); http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java index 58c55c2..bd4653b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java @@ -144,7 +144,7 @@ public class TestDAGRecovery { private void restoreFromDAGFinishedEvent(DAGState finalState) { DAGState recoveredState = dag.restoreFromEvent(new DAGFinishedEvent(dagId, startTime, finishTime, - finalState, "", tezCounters, user, dagName, null)); + finalState, "", tezCounters, user, dagName, null, null)); assertEquals(finishTime, dag.finishTime); assertFalse(dag.recoveryCommitInProgress); assertEquals(finalState, recoveredState); http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index bf61ff0..302700c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -217,7 +217,7 @@ public class TestHistoryEventsProtoConversion { { DAGFinishedEvent event = new DAGFinishedEvent( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l, - DAGState.FAILED, null, null, "user", "dagName", null); + DAGState.FAILED, null, null, "user", "dagName", null, null); DAGFinishedEvent deserializedEvent = (DAGFinishedEvent) testProtoConversion(event); Assert.assertEquals( @@ -238,7 +238,7 @@ public class TestHistoryEventsProtoConversion { DAGFinishedEvent event = new DAGFinishedEvent( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l, DAGState.FAILED, "bad diagnostics", tezCounters, - "user", "dagName", null); + "user", "dagName", null, null); DAGFinishedEvent deserializedEvent = (DAGFinishedEvent) testProtoConversion(event); Assert.assertEquals( @@ -717,10 +717,29 @@ public class TestHistoryEventsProtoConversion { case VERTEX_GROUP_COMMIT_FINISHED: testVertexGroupCommitFinishedEvent(); break; + case DAG_RECOVERED: + testDAGRecoveredEvent(); + break; default: throw new Exception("Unhandled Event type in Unit tests: " + eventType); } } } + private void testDAGRecoveredEvent() { + DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1), + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), + "mockDagname", "mockuser", 100334l); + try { + testProtoConversion(dagRecoveredEvent); + Assert.fail("Proto conversion should have failed"); + } catch (UnsupportedOperationException e) { + // Expected + } catch (IOException e) { + Assert.fail("Proto conversion should have failed with Unsupported Exception"); + } + + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index bbf29e3..c6749af 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -50,6 +50,7 @@ import org.apache.tez.dag.history.events.ContainerStoppedEvent; import org.apache.tez.dag.history.events.DAGCommitStartedEvent; import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; @@ -132,7 +133,7 @@ public class TestHistoryEventJsonConversion { break; case DAG_FINISHED: event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR, - null, null, user, dagPlan.getName(), null); + null, null, user, dagPlan.getName(), null, applicationAttemptId); break; case VERTEX_INITIALIZED: event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(), @@ -186,6 +187,10 @@ public class TestHistoryEventJsonConversion { case VERTEX_GROUP_COMMIT_FINISHED: event = new VertexGroupCommitFinishedEvent(); break; + case DAG_RECOVERED: + event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, dagPlan.getName(), user, + 1l); + break; default: Assert.fail("Unhandled event type " + eventType); } http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index fdd8f19..7c804f5 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -40,6 +40,7 @@ import org.apache.tez.dag.history.events.ContainerLaunchedEvent; import org.apache.tez.dag.history.events.ContainerStoppedEvent; import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; @@ -115,6 +116,10 @@ public class HistoryEventTimelineConversion { timelineEntity = convertVertexParallelismUpdatedEvent( (VertexParallelismUpdatedEvent) historyEvent); break; + case DAG_RECOVERED: + timelineEntity = convertDAGRecoveredEvent( + (DAGRecoveredEvent) historyEvent); + break; case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: case VERTEX_COMMIT_STARTED: case VERTEX_GROUP_COMMIT_STARTED: @@ -129,6 +134,34 @@ public class HistoryEventTimelineConversion { return timelineEntity; } + private static TimelineEntity convertDAGRecoveredEvent(DAGRecoveredEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + TimelineEvent recoverEvt = new TimelineEvent(); + recoverEvt.setEventType(HistoryEventType.DAG_RECOVERED.name()); + recoverEvt.setTimestamp(event.getRecoveredTime()); + recoverEvt.addEventInfo(ATSConstants.APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); + if (event.getRecoveredDagState() != null) { + recoverEvt.addEventInfo(ATSConstants.DAG_STATE, event.getRecoveredDagState().name()); + } + if (event.getRecoveryFailureReason() != null) { + recoverEvt.addEventInfo(ATSConstants.RECOVERY_FAILURE_REASON, + event.getRecoveryFailureReason()); + } + + atsEntity.addEvent(recoverEvt); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); + + return atsEntity; + } + private static TimelineEntity convertAppLaunchedEvent(AppLaunchedEvent event) { TimelineEntity atsEntity = new TimelineEntity(); atsEntity.setEntityId("tez_" @@ -270,6 +303,8 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); atsEntity.addOtherInfo(ATSConstants.COUNTERS, DAGUtils.convertCountersToATSMap(event.getTezCounters())); + atsEntity.addOtherInfo(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); final Map<String, Integer> dagTaskStats = event.getDagTaskStats(); if (dagTaskStats != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index 14330ba..3d2b662 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -56,6 +56,7 @@ import org.apache.tez.dag.history.events.ContainerStoppedEvent; import org.apache.tez.dag.history.events.DAGCommitStartedEvent; import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; @@ -137,7 +138,7 @@ public class TestHistoryEventTimelineConversion { break; case DAG_FINISHED: event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR, - null, null, user, dagPlan.getName(), null); + null, null, user, dagPlan.getName(), null, applicationAttemptId); break; case VERTEX_INITIALIZED: event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(), @@ -191,6 +192,10 @@ public class TestHistoryEventTimelineConversion { case VERTEX_GROUP_COMMIT_FINISHED: event = new VertexGroupCommitFinishedEvent(); break; + case DAG_RECOVERED: + event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, dagPlan.getName(), + user, random.nextLong()); + break; default: Assert.fail("Unhandled event type " + eventType); } @@ -238,15 +243,15 @@ public class TestHistoryEventTimelineConversion { Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.TEZ_VERSION)); Assert.assertEquals(user, timelineEntity.getOtherInfo().get(ATSConstants.USER)); Assert.assertEquals(applicationId.toString(), - timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); + timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); Map<String, String> config = - (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.CONFIG); + (Map<String, String>) timelineEntity.getOtherInfo().get(ATSConstants.CONFIG); Assert.assertEquals(conf.get("foo"), config.get("foo")); Assert.assertEquals(conf.get("applicationId"), config.get("applicationId")); Map<String, String> versionInfo = - (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.TEZ_VERSION); + (Map<String, String>) timelineEntity.getOtherInfo().get(ATSConstants.TEZ_VERSION); Assert.assertEquals(mockVersionInfo.getVersion(), versionInfo.get(ATSConstants.VERSION)); Assert.assertEquals(mockVersionInfo.getRevision(), @@ -310,7 +315,7 @@ public class TestHistoryEventTimelineConversion { Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( applicationAttemptId.getApplicationId().toString())); - Assert.assertEquals(containerId.toString(),timelineEntity.getOtherInfo().get(ATSConstants.CONTAINER_ID)); + Assert.assertEquals(containerId.toString(), timelineEntity.getOtherInfo().get(ATSConstants.CONTAINER_ID)); Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue()); @@ -381,6 +386,7 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(startTime, otherInfo.get(ATSConstants.START_TIME)); Assert.assertEquals(DAGState.RUNNING.name(), otherInfo.get(ATSConstants.STATUS)); } + @Test(timeout = 5000) public void testConvertDAGSubmittedEvent() { long submitTime = random.nextLong(); @@ -426,11 +432,11 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(applicationAttemptId.getApplicationId().toString(), timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); Assert.assertEquals(user, - timelineEntity.getOtherInfo().get(ATSConstants.USER)); + timelineEntity.getOtherInfo().get(ATSConstants.USER)); } @Test(timeout = 5000) - public void testConvertTaskAttemptFinishedEvent(){ + public void testConvertTaskAttemptFinishedEvent() { String vertexName = "testVertex"; long startTime = random.nextLong(); long finishTime = startTime + 1234; @@ -478,7 +484,7 @@ public class TestHistoryEventTimelineConversion { public void testConvertDAGInitializedEvent() { long initTime = random.nextLong(); - Map<String,TezVertexID> nameIdMap = new HashMap<String, TezVertexID>(); + Map<String, TezVertexID> nameIdMap = new HashMap<String, TezVertexID>(); nameIdMap.put("foo", tezVertexID); DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName", @@ -518,12 +524,12 @@ public class TestHistoryEventTimelineConversion { public void testConvertDAGFinishedEvent() { long finishTime = random.nextLong(); long startTime = random.nextLong(); - Map<String,Integer> taskStats = new HashMap<String, Integer>(); + Map<String, Integer> taskStats = new HashMap<String, Integer>(); taskStats.put("FOO", 100); taskStats.put("BAR", 200); DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR, - "diagnostics", null, user, dagPlan.getName(), taskStats); + "diagnostics", null, user, dagPlan.getName(), taskStats, applicationAttemptId); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); @@ -549,21 +555,23 @@ public class TestHistoryEventTimelineConversion { DAGState.ERROR.name())); Assert.assertEquals(startTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue()); + ((Long) timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue()); Assert.assertEquals(finishTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue()); + ((Long) timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue()); Assert.assertEquals(finishTime - startTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue()); + ((Long) timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue()); Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS)); Assert.assertEquals(DAGState.ERROR.name(), timelineEntity.getOtherInfo().get(ATSConstants.STATUS)); Assert.assertEquals("diagnostics", timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS)); + Assert.assertEquals(applicationAttemptId.toString(), + timelineEntity.getOtherInfo().get(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID)); Assert.assertEquals(100, - ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue()); + ((Integer) timelineEntity.getOtherInfo().get("FOO")).intValue()); Assert.assertEquals(200, - ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue()); + ((Integer) timelineEntity.getOtherInfo().get("BAR")).intValue()); } @Test(timeout = 5000) @@ -602,13 +610,13 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals("proc", timelineEntity.getOtherInfo().get(ATSConstants.PROCESSOR_CLASS_NAME)); Assert.assertEquals(initedTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue()); + ((Long) timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue()); Assert.assertEquals(initRequestedTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_REQUESTED_TIME)).longValue()); + ((Long) timelineEntity.getOtherInfo().get(ATSConstants.INIT_REQUESTED_TIME)).longValue()); Assert.assertEquals(initedTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue()); + ((Long) timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue()); Assert.assertEquals(numTasks, - ((Integer)timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue()); + ((Integer) timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue()); } @Test(timeout = 5000) @@ -618,12 +626,12 @@ public class TestHistoryEventTimelineConversion { long startRequestedTime = random.nextLong(); long startTime = random.nextLong(); long finishTime = random.nextLong(); - Map<String,Integer> taskStats = new HashMap<String, Integer>(); + Map<String, Integer> taskStats = new HashMap<String, Integer>(); taskStats.put("FOO", 100); taskStats.put("BAR", 200); VertexStats vertexStats = new VertexStats(); - VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1,initRequestedTime, + VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1, initRequestedTime, initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR, "diagnostics", null, vertexStats, taskStats); @@ -649,9 +657,9 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(finishTime, timelineEvent.getTimestamp()); Assert.assertEquals(finishTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue()); + ((Long) timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue()); Assert.assertEquals(finishTime - startTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue()); + ((Long) timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue()); Assert.assertEquals(VertexState.ERROR.name(), timelineEntity.getOtherInfo().get(ATSConstants.STATUS)); Assert.assertEquals("diagnostics", @@ -660,9 +668,9 @@ public class TestHistoryEventTimelineConversion { Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS)); Assert.assertEquals(100, - ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue()); + ((Integer) timelineEntity.getOtherInfo().get("FOO")).intValue()); Assert.assertEquals(200, - ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue()); + ((Integer) timelineEntity.getOtherInfo().get("BAR")).intValue()); } @Test(timeout = 5000) @@ -702,9 +710,9 @@ public class TestHistoryEventTimelineConversion { Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME)); Assert.assertEquals(scheduleTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)).longValue()); + ((Long) timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)).longValue()); Assert.assertEquals(startTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue()); + ((Long) timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue()); Assert.assertTrue(TaskState.SCHEDULED.name() .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS))); } @@ -847,5 +855,74 @@ public class TestHistoryEventTimelineConversion { } + @Test(timeout = 5000) + public void testConvertDAGRecoveredEvent() { + long recoverTime = random.nextLong(); + + DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, + dagPlan.getName(), user, recoverTime); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(0, timelineEntity.getRelatedEntities().size()); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.DAG_RECOVERED.name(), timelineEvent.getEventType()); + Assert.assertEquals(recoverTime, timelineEvent.getTimestamp()); + + Assert.assertTrue(timelineEvent.getEventInfo().containsKey(ATSConstants.APPLICATION_ATTEMPT_ID)); + Assert.assertEquals(applicationAttemptId.toString(), + timelineEvent.getEventInfo().get(ATSConstants.APPLICATION_ATTEMPT_ID)); + + Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("DAGPlanMock")); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); + } + + @Test(timeout = 5000) + public void testConvertDAGRecoveredEvent2() { + long recoverTime = random.nextLong(); + + DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, + dagPlan.getName(), user, recoverTime, DAGState.ERROR, "mock reason"); + + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(0, timelineEntity.getRelatedEntities().size()); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.DAG_RECOVERED.name(), timelineEvent.getEventType()); + Assert.assertEquals(recoverTime, timelineEvent.getTimestamp()); + + Assert.assertTrue(timelineEvent.getEventInfo().containsKey(ATSConstants.APPLICATION_ATTEMPT_ID)); + Assert.assertEquals(applicationAttemptId.toString(), + timelineEvent.getEventInfo().get(ATSConstants.APPLICATION_ATTEMPT_ID)); + Assert.assertEquals(DAGState.ERROR.name(), + timelineEvent.getEventInfo().get(ATSConstants.DAG_STATE)); + Assert.assertEquals("mock reason", + timelineEvent.getEventInfo().get(ATSConstants.RECOVERY_FAILURE_REASON)); + + Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("DAGPlanMock")); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); + } + -} +} \ No newline at end of file
