Repository: tez Updated Branches: refs/heads/master 59f56a540 -> e9d0b1b26
TEZ-3611. Create lightweight summary events for ATS. (harishjp) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e9d0b1b2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e9d0b1b2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e9d0b1b2 Branch: refs/heads/master Commit: e9d0b1b266f93a15029159b2da94c5d438d7408b Parents: 59f56a5 Author: Harish JP <[email protected]> Authored: Thu Apr 20 15:42:16 2017 +0530 Committer: Harish JP <[email protected]> Committed: Thu Apr 20 15:42:16 2017 +0530 ---------------------------------------------------------------------- .../org/apache/tez/common/ATSConstants.java | 1 + .../tez/dag/history/logging/EntityTypes.java | 1 + .../org/apache/tez/history/ATSImportTool.java | 19 ++- .../logging/ats/TimelineCachePluginImpl.java | 4 +- .../ats/TestTimelineCachePluginImpl.java | 2 + .../ats/ATSV15HistoryLoggingService.java | 11 +- .../ats/TestATSV15HistoryLoggingService.java | 10 +- .../logging/ats/ATSHistoryLoggingService.java | 8 +- .../ats/HistoryEventTimelineConversion.java | 150 ++++++++++------- .../ats/TestATSHistoryLoggingService.java | 8 +- .../ats/TestHistoryEventTimelineConversion.java | 168 ++++++++++++++++--- 11 files changed, 282 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index 25c802e..6e07849 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -47,6 +47,7 @@ public class ATSConstants { public static final String USER = "user"; public static final String CALLER_CONTEXT_ID = "callerId"; public static final String CALLER_CONTEXT_TYPE = "callerType"; + public static final String CALLER_CONTEXT = "callerContext"; /* Keys used in other info */ public static final String APP_SUBMIT_TIME = "appSubmitTime"; http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java b/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java index e2f0882..6f6205d 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java +++ b/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java @@ -23,6 +23,7 @@ public enum EntityTypes { TEZ_APPLICATION_ATTEMPT, TEZ_CONTAINER_ID, TEZ_DAG_ID, + TEZ_DAG_EXTRA_INFO, TEZ_VERTEX_ID, TEZ_TASK_ID, TEZ_TASK_ATTEMPT_ID, http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java index 3efeb5a..fee226a 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java @@ -35,7 +35,6 @@ import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.MissingOptionException; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; @@ -50,6 +49,7 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.history.parser.datamodel.Constants; import org.apache.tez.history.parser.utils.Utils; @@ -68,6 +68,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.net.URLEncoder; +import java.util.Iterator; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -191,6 +192,22 @@ public class ATSImportTool extends Configured implements Tool { //Download dag String dagUrl = String.format("%s/%s/%s", baseUri, Constants.TEZ_DAG_ID, dagId); JSONObject dagRoot = getJsonRootEntity(dagUrl); + + // We have added dag extra info, if we find any from ATS we copy the info into dag object + // extra info. + String dagExtraInfoUrl = String.format("%s/%s/%s", baseUri, EntityTypes.TEZ_DAG_EXTRA_INFO, + dagId); + JSONObject dagExtraInfo = getJsonRootEntity(dagExtraInfoUrl); + if (dagExtraInfo.has(Constants.OTHER_INFO)) { + JSONObject dagOtherInfo = dagRoot.getJSONObject(Constants.OTHER_INFO); + JSONObject extraOtherInfo = dagExtraInfo.getJSONObject(Constants.OTHER_INFO); + @SuppressWarnings("unchecked") + Iterator<String> iter = extraOtherInfo.keys(); + while (iter.hasNext()) { + String key = iter.next(); + dagOtherInfo.put(key, extraOtherInfo.get(key)); + } + } finalJson.put(Constants.DAG, dagRoot); //Create a zip entry with dagId as its name. http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java index 8269714..d211feb 100644 --- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java +++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java @@ -50,6 +50,7 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin implement static { knownEntityTypes = Sets.newHashSet( EntityTypes.TEZ_DAG_ID.name(), + EntityTypes.TEZ_DAG_EXTRA_INFO.name(), EntityTypes.TEZ_VERTEX_ID.name(), EntityTypes.TEZ_TASK_ID.name(), EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), @@ -84,7 +85,8 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin implement || entityId == null || entityId.isEmpty()) { return null; } - if (entityType.equals(EntityTypes.TEZ_DAG_ID.name())) { + if (entityType.equals(EntityTypes.TEZ_DAG_ID.name()) || + entityType.equals(EntityTypes.TEZ_DAG_EXTRA_INFO.name())) { TezDAGID dagId = TezDAGID.fromString(entityId); if (dagId != null) { return createTimelineEntityGroupIds(dagId); http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java index 3d1af63..1bfa0a1 100644 --- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java +++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java @@ -101,12 +101,14 @@ public class TestTimelineCachePluginImpl { typeIdMap1 = new HashMap<String, String>(); typeIdMap1.put(EntityTypes.TEZ_DAG_ID.name(), dagID1.toString()); + typeIdMap1.put(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), dagID1.toString()); typeIdMap1.put(EntityTypes.TEZ_VERTEX_ID.name(), vertexID1.toString()); typeIdMap1.put(EntityTypes.TEZ_TASK_ID.name(), taskID1.toString()); typeIdMap1.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), attemptID1.toString()); typeIdMap2 = new HashMap<String, String>(); typeIdMap2.put(EntityTypes.TEZ_DAG_ID.name(), dagID2.toString()); + typeIdMap2.put(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), dagID2.toString()); typeIdMap2.put(EntityTypes.TEZ_VERTEX_ID.name(), vertexID2.toString()); typeIdMap2.put(EntityTypes.TEZ_TASK_ID.name(), taskID2.toString()); typeIdMap2.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), attemptID2.toString()); http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java index a095cbc..a71f0d8 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.history.logging.ats; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -375,16 +376,20 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) { return; } - - TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity( + TimelineEntityGroupId groupId = getGroupId(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities( event.getHistoryEvent()); + for (TimelineEntity entity : entities) { + logEntity(groupId, entity, domainId); + } + } + private void logEntity(TimelineEntityGroupId groupId, TimelineEntity entity, String domainId) { if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) { historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId); } try { - TimelineEntityGroupId groupId = getGroupId(event); TimelinePutResponse response = timelineClient.putEntities( appContext.getApplicationAttemptId(), groupId, entity); if (response != null http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java index ef5da81..96c3c80 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java @@ -111,7 +111,7 @@ public class TestATSV15HistoryLoggingService { List<TimelineEntity> nonGroupedDagEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, dagId1.toString())); assertNotNull(nonGroupedDagEvents); - assertEquals(4, nonGroupedDagEvents.size()); + assertEquals(5, nonGroupedDagEvents.size()); service.stop(); } @@ -139,7 +139,7 @@ public class TestATSV15HistoryLoggingService { List<TimelineEntity> nonGroupedDagEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, dagId1.toString())); assertNotNull(nonGroupedDagEvents); - assertEquals(4, nonGroupedDagEvents.size()); + assertEquals(5, nonGroupedDagEvents.size()); service.stop(); } @@ -185,7 +185,7 @@ public class TestATSV15HistoryLoggingService { List<TimelineEntity> groupedDagEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, dagId1.getGroupId(numDagsPerGroup))); assertNotNull(groupedDagEvents); - assertEquals(8, groupedDagEvents.size()); + assertEquals(10, groupedDagEvents.size()); nonGroupedDagEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, dagId3.toString())); @@ -194,7 +194,7 @@ public class TestATSV15HistoryLoggingService { groupedDagEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, dagId3.getGroupId(numDagsPerGroup))); assertNotNull(groupedDagEvents); - assertEquals(4, groupedDagEvents.size()); + assertEquals(5, groupedDagEvents.size()); service.stop(); } @@ -338,7 +338,7 @@ public class TestATSV15HistoryLoggingService { // calls were made with correct domain ids. verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("session-id")); - verify(historyACLPolicyManager, times(4)).updateTimelineEntityDomain(any(), eq("dag-id")); + verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("dag-id")); service.stop(); } http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java index dc215fd..6d035cc 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java @@ -336,11 +336,13 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) { continue; } - TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity( + List<TimelineEntity> eventEntities = HistoryEventTimelineConversion.convertToTimelineEntities( event.getHistoryEvent()); - entities.add(entity); + entities.addAll(eventEntities); if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) { - historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId); + for (TimelineEntity entity: eventEntities) { + historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index faccc98..235a292 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -19,7 +19,9 @@ package org.apache.tez.dag.history.logging.ats; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; @@ -31,6 +33,7 @@ import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto; import org.apache.tez.dag.app.web.AMWebController; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -56,82 +59,82 @@ import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.dag.history.utils.DAGUtils; import org.apache.tez.dag.records.TezVertexID; +import com.google.common.collect.Lists; + public class HistoryEventTimelineConversion { - public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) { - if (!historyEvent.isHistoryEvent()) { + private static void validateEvent(HistoryEvent event) { + if (!event.isHistoryEvent()) { throw new UnsupportedOperationException("Invalid Event, does not support history" - + ", eventType=" + historyEvent.getEventType()); + + ", eventType=" + event.getEventType()); } - TimelineEntity timelineEntity; + } + + public static List<TimelineEntity> convertToTimelineEntities(HistoryEvent historyEvent) { + validateEvent(historyEvent); switch (historyEvent.getEventType()) { case APP_LAUNCHED: - timelineEntity = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent); - break; + return Collections.singletonList(convertAppLaunchedEvent((AppLaunchedEvent) historyEvent)); case AM_LAUNCHED: - timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent); - break; + return Collections.singletonList(convertAMLaunchedEvent((AMLaunchedEvent) historyEvent)); case AM_STARTED: - timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent); - break; + return Collections.singletonList(convertAMStartedEvent((AMStartedEvent) historyEvent)); case CONTAINER_LAUNCHED: - timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent); - break; + return Collections.singletonList( + convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent)); case CONTAINER_STOPPED: - timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent); - break; + return Collections.singletonList( + convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent)); case DAG_SUBMITTED: - timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent); - break; + return Lists.newArrayList( + convertDAGSubmittedToDAGExtraInfoEntity((DAGSubmittedEvent)historyEvent), + convertDAGSubmittedEvent((DAGSubmittedEvent)historyEvent)); case DAG_INITIALIZED: - timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent); - break; + return Collections.singletonList( + convertDAGInitializedEvent((DAGInitializedEvent) historyEvent)); case DAG_STARTED: - timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent); - break; + return Collections.singletonList(convertDAGStartedEvent((DAGStartedEvent) historyEvent)); case DAG_FINISHED: - timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent); - break; + return Lists.newArrayList( + convertDAGFinishedToDAGExtraInfoEntity((DAGFinishedEvent) historyEvent), + convertDAGFinishedEvent((DAGFinishedEvent) historyEvent)); case VERTEX_INITIALIZED: - timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent); - break; + return Collections.singletonList( + convertVertexInitializedEvent((VertexInitializedEvent) historyEvent)); case VERTEX_STARTED: - timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent); - break; + return Collections.singletonList( + convertVertexStartedEvent((VertexStartedEvent) historyEvent)); case VERTEX_FINISHED: - timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent); - break; + return Collections.singletonList( + convertVertexFinishedEvent((VertexFinishedEvent) historyEvent)); case TASK_STARTED: - timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent); - break; + return Collections.singletonList(convertTaskStartedEvent((TaskStartedEvent) historyEvent)); case TASK_FINISHED: - timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent); - break; + return Collections.singletonList( + convertTaskFinishedEvent((TaskFinishedEvent) historyEvent)); case TASK_ATTEMPT_STARTED: - timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent); - break; + return Collections.singletonList( + convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent)); case TASK_ATTEMPT_FINISHED: - timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent); - break; + return Collections.singletonList( + convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent)); case VERTEX_CONFIGURE_DONE: - timelineEntity = convertVertexReconfigureDoneEvent( - (VertexConfigurationDoneEvent) historyEvent); - break; + return Collections.singletonList( + convertVertexReconfigureDoneEvent((VertexConfigurationDoneEvent) historyEvent)); case DAG_RECOVERED: - timelineEntity = convertDAGRecoveredEvent( - (DAGRecoveredEvent) historyEvent); - break; + return Collections.singletonList( + convertDAGRecoveredEvent((DAGRecoveredEvent) historyEvent)); case VERTEX_COMMIT_STARTED: case VERTEX_GROUP_COMMIT_STARTED: case VERTEX_GROUP_COMMIT_FINISHED: case DAG_COMMIT_STARTED: + case DAG_KILL_REQUEST: throw new UnsupportedOperationException("Invalid Event, does not support history" + ", eventType=" + historyEvent.getEventType()); - default: - throw new UnsupportedOperationException("Unhandled Event" - + ", eventType=" + historyEvent.getEventType()); + // Do not add default, if a new event type is added, we'll get a warning for the switch. } - return timelineEntity; + throw new UnsupportedOperationException("Unhandled Event, eventType=" + + historyEvent.getEventType()); } private static TimelineEntity convertDAGRecoveredEvent(DAGRecoveredEvent event) { @@ -309,8 +312,6 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); - atsEntity.addOtherInfo(ATSConstants.COUNTERS, - DAGUtils.convertCountersToATSMap(event.getTezCounters())); atsEntity.addOtherInfo(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID, event.getApplicationAttemptId().toString()); @@ -324,6 +325,24 @@ public class HistoryEventTimelineConversion { return atsEntity; } + private static TimelineEntity convertDAGFinishedToDAGExtraInfoEntity(DAGFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_EXTRA_INFO.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(), event.getDagID().toString()); + + TimelineEvent submitEvt = new TimelineEvent(); + submitEvt.setEventType(HistoryEventType.DAG_FINISHED.name()); + submitEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(submitEvt); + + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + return atsEntity; + } + + private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) { TimelineEntity atsEntity = new TimelineEntity(); atsEntity.setEntityId(event.getDagID().toString()); @@ -397,19 +416,15 @@ public class HistoryEventTimelineConversion { if (event.getDAGPlan().hasCallerContext() && event.getDAGPlan().getCallerContext().hasCallerId()) { - atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_ID, - event.getDAGPlan().getCallerContext().getCallerId()); + CallerContextProto callerContext = event.getDagPlan().getCallerContext(); + atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_ID, callerContext.getCallerId()); + atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_ID, callerContext.getCallerId()); + atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT, callerContext.getContext()); } if (event.getQueueName() != null) { atsEntity.addPrimaryFilter(ATSConstants.DAG_QUEUE_NAME, event.getQueueName()); } - try { - atsEntity.addOtherInfo(ATSConstants.DAG_PLAN, - DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan())); - } catch (IOException e) { - throw new TezUncheckedException(e); - } atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID, event.getApplicationAttemptId().getApplicationId().toString()); atsEntity.addOtherInfo(ATSConstants.APPLICATION_ATTEMPT_ID, @@ -433,6 +448,29 @@ public class HistoryEventTimelineConversion { return atsEntity; } + private static TimelineEntity convertDAGSubmittedToDAGExtraInfoEntity(DAGSubmittedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_EXTRA_INFO.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(), event.getDagID().toString()); + + TimelineEvent submitEvt = new TimelineEvent(); + submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name()); + submitEvt.setTimestamp(event.getSubmitTime()); + atsEntity.addEvent(submitEvt); + + atsEntity.setStartTime(event.getSubmitTime()); + + try { + atsEntity.addOtherInfo(ATSConstants.DAG_PLAN, + DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan())); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + return atsEntity; + } + private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) { TimelineEntity atsEntity = new TimelineEntity(); atsEntity.setEntityId(event.getTaskAttemptID().toString()); http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java index a641cda..6603f4f 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java @@ -248,7 +248,7 @@ public class TestATSHistoryLoggingService { .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); // All calls made with session domain id. - verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("session-id")); + verify(historyACLPolicyManager, times(6)).updateTimelineEntityDomain(any(), eq("session-id")); } @Test(timeout=10000) @@ -299,7 +299,7 @@ public class TestATSHistoryLoggingService { // All calls made with session domain id. verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("session-id")); - Assert.assertEquals(5, atsEntitiesCounter); + Assert.assertEquals(6, atsEntitiesCounter); } @Test(timeout=10000) @@ -333,7 +333,7 @@ public class TestATSHistoryLoggingService { // All calls made with session domain id. verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("test-domain")); - verify(historyACLPolicyManager, times(4)).updateTimelineEntityDomain(any(), eq("dag-domain")); + verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("dag-domain")); } @Test(timeout=10000) @@ -433,7 +433,7 @@ public class TestATSHistoryLoggingService { // All calls made with session domain id. verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any()); - Assert.assertEquals(5, atsEntitiesCounter); + Assert.assertEquals(6, atsEntitiesCounter); } private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId, http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index 28fd5da..1663cb0 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -221,7 +221,7 @@ public class TestHistoryEventTimelineConversion { if (event == null || !event.isHistoryEvent()) { continue; } - HistoryEventTimelineConversion.convertToTimelineEntity(event); + HistoryEventTimelineConversion.convertToTimelineEntities(event); } } @@ -259,7 +259,7 @@ public class TestHistoryEventTimelineConversion { MockVersionInfo mockVersionInfo = new MockVersionInfo(); AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime, submitTime, user, conf, mockVersionInfo); - HistoryEventTimelineConversion.convertToTimelineEntity(event); + HistoryEventTimelineConversion.convertToTimelineEntities(event); } finally { shutdown.set(true); confChanger.join(); @@ -279,7 +279,9 @@ public class TestHistoryEventTimelineConversion { AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime, submitTime, user, conf, mockVersionInfo); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue()); @@ -322,7 +324,9 @@ public class TestHistoryEventTimelineConversion { long submitTime = random.nextLong(); AMLaunchedEvent event = new AMLaunchedEvent(applicationAttemptId, launchTime, submitTime, user); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); Assert.assertEquals("tez_" + applicationAttemptId.toString(), timelineEntity.getEntityId()); Assert.assertEquals(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), timelineEntity.getEntityType()); @@ -357,7 +361,9 @@ public class TestHistoryEventTimelineConversion { AMStartedEvent event = new AMStartedEvent(applicationAttemptId, startTime, user); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); Assert.assertEquals("tez_" + applicationAttemptId.toString(), timelineEntity.getEntityId()); Assert.assertEquals(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), timelineEntity.getEntityType()); @@ -383,7 +389,9 @@ public class TestHistoryEventTimelineConversion { long launchTime = random.nextLong(); ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime, applicationAttemptId); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType()); Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId()); @@ -414,7 +422,9 @@ public class TestHistoryEventTimelineConversion { int exitStatus = random.nextInt(); ContainerStoppedEvent event = new ContainerStoppedEvent(containerId, stopTime, exitStatus, applicationAttemptId); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId()); Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType()); @@ -446,7 +456,10 @@ public class TestHistoryEventTimelineConversion { long startTime = random.nextLong(); String dagName = "testDagName"; DAGStartedEvent event = new DAGStartedEvent(tezDAGID, startTime, user, dagName); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); @@ -477,7 +490,21 @@ public class TestHistoryEventTimelineConversion { DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan, applicationAttemptId, null, user, null, containerLogs, queueName); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(2, entities.size()); + + + if (entities.get(0).getEntityType().equals(EntityTypes.TEZ_DAG_ID.name())) { + assertDagSubmittedEntity(submitTime, event, entities.get(0)); + assertDagSubmittedExtraInfoEntity(submitTime, event, entities.get(1)); + } else { + assertDagSubmittedExtraInfoEntity(submitTime, event, entities.get(0)); + assertDagSubmittedEntity(submitTime, event, entities.get(1)); + } + } + + private void assertDagSubmittedEntity(long submitTime, DAGSubmittedEvent event, + TimelineEntity timelineEntity) { Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); @@ -511,10 +538,9 @@ public class TestHistoryEventTimelineConversion { timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); Assert.assertTrue( timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_QUEUE_NAME) - .contains(queueName)); + .contains(event.getQueueName())); Assert.assertEquals(9, timelineEntity.getOtherInfo().size()); - Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN)); Assert.assertEquals(applicationId.toString(), timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); Assert.assertEquals(applicationAttemptId.toString(), @@ -534,9 +560,31 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals( timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE), dagPlan.getCallerContext().getCallerType()); + Assert.assertEquals(dagPlan.getCallerContext().getContext(), + timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT)); Assert.assertEquals( - queueName, timelineEntity.getOtherInfo().get(ATSConstants.DAG_QUEUE_NAME)); + event.getQueueName(), timelineEntity.getOtherInfo().get(ATSConstants.DAG_QUEUE_NAME)); + + } + + private void assertDagSubmittedExtraInfoEntity(long submitTime, DAGSubmittedEvent event, + TimelineEntity timelineEntity) { + Assert.assertEquals(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(1, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue(timelineEntity.getRelatedEntities() + .get(EntityTypes.TEZ_DAG_ID.name()).contains(tezDAGID.toString())); + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), timelineEvent.getEventType()); + Assert.assertEquals(submitTime, timelineEvent.getTimestamp()); + + Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue()); + Assert.assertEquals(0, timelineEntity.getPrimaryFilters().size()); + Assert.assertEquals(1, timelineEntity.getOtherInfo().size()); + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN)); } @SuppressWarnings("unchecked") @@ -561,7 +609,10 @@ public class TestHistoryEventTimelineConversion { TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName, startTime, finishTime, state, TaskFailureType.FATAL, error, diagnostics, counters, events, null, creationTime, tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress"); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); + Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); @@ -613,8 +664,11 @@ public class TestHistoryEventTimelineConversion { creationTime, tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress"); - TimelineEntity timelineEntityWithNullFailureType = - HistoryEventTimelineConversion.convertToTimelineEntity(eventWithNullFailureType); + List<TimelineEntity> evtEntities = HistoryEventTimelineConversion.convertToTimelineEntities( + eventWithNullFailureType); + Assert.assertEquals(1, evtEntities.size()); + TimelineEntity timelineEntityWithNullFailureType = evtEntities.get(0); + Assert.assertNull( timelineEntityWithNullFailureType.getOtherInfo().get(ATSConstants.TASK_FAILURE_TYPE)); } @@ -630,7 +684,11 @@ public class TestHistoryEventTimelineConversion { DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName", nameIdMap); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); @@ -671,7 +729,20 @@ public class TestHistoryEventTimelineConversion { DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR, "diagnostics", null, user, dagPlan.getName(), taskStats, applicationAttemptId, dagPlan); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(2, entities.size()); + + if (entities.get(0).getEntityType().equals(EntityTypes.TEZ_DAG_ID.name())) { + assertDagFinishedEntity(finishTime, startTime, event, entities.get(0)); + assertDagFinishedExtraInfoEntity(finishTime, entities.get(1)); + } else { + assertDagFinishedExtraInfoEntity(finishTime, entities.get(0)); + assertDagFinishedEntity(finishTime, startTime, event, entities.get(1)); + } + } + + private void assertDagFinishedEntity(long finishTime, long startTime, DAGFinishedEvent event, + TimelineEntity timelineEntity) { Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); @@ -703,7 +774,6 @@ public class TestHistoryEventTimelineConversion { ((Long) timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue()); Assert.assertEquals(finishTime - startTime, ((Long) timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue()); - Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS)); Assert.assertEquals(DAGState.ERROR.name(), timelineEntity.getOtherInfo().get(ATSConstants.STATUS)); Assert.assertEquals("diagnostics", @@ -717,6 +787,23 @@ public class TestHistoryEventTimelineConversion { ((Integer) timelineEntity.getOtherInfo().get("BAR")).intValue()); } + private void assertDagFinishedExtraInfoEntity(long finishTime, TimelineEntity timelineEntity) { + Assert.assertEquals(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(1, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.TEZ_DAG_ID).contains( + tezDAGID.toString())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType()); + Assert.assertEquals(finishTime, timelineEvent.getTimestamp()); + + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS)); + } + @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testConvertVertexInitializedEvent() { @@ -731,7 +818,11 @@ public class TestHistoryEventTimelineConversion { .setTaskSchedulerClassName("def1") .setTaskCommunicatorClassName("ghi1")); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); + Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType()); Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId()); @@ -794,7 +885,10 @@ public class TestHistoryEventTimelineConversion { VertexStartedEvent event = new VertexStartedEvent(tezVertexID, startRequestedTime, startTime); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); + Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType()); Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId()); @@ -844,7 +938,10 @@ public class TestHistoryEventTimelineConversion { .setTaskSchedulerClassName("def1") .setTaskCommunicatorClassName("ghi1")); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); + Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType()); Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId()); @@ -909,7 +1006,10 @@ public class TestHistoryEventTimelineConversion { long startTime = random.nextLong(); TaskStartedEvent event = new TaskStartedEvent(tezTaskID, "v1", scheduleTime, startTime); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); + Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType()); Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId()); @@ -953,7 +1053,10 @@ public class TestHistoryEventTimelineConversion { TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress"); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); + Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); @@ -1009,7 +1112,9 @@ public class TestHistoryEventTimelineConversion { TaskFinishedEvent event = new TaskFinishedEvent(tezTaskID, vertexName, startTime, finishTime, tezTaskAttemptID, state, diagnostics, counters, 3); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId()); Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType()); @@ -1054,7 +1159,10 @@ public class TestHistoryEventTimelineConversion { VertexConfigurationDoneEvent event = new VertexConfigurationDoneEvent(vId, 0L, 1, null, edgeMgrs, null, true); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); + Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType()); Assert.assertEquals(vId.toString(), timelineEntity.getEntityId()); Assert.assertEquals(1, timelineEntity.getEvents().size()); @@ -1092,7 +1200,10 @@ public class TestHistoryEventTimelineConversion { DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, dagPlan.getName(), user, recoverTime, containerLogs); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); @@ -1128,7 +1239,10 @@ public class TestHistoryEventTimelineConversion { dagPlan.getName(), user, recoverTime, DAGState.ERROR, "mock reason", containerLogs); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event); + Assert.assertEquals(1, entities.size()); + TimelineEntity timelineEntity = entities.get(0); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
