Repository: tez Updated Branches: refs/heads/branch-0.8 c797d6e3f -> 26d179f8c
TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels. Contributed by Harish Jaiprakash. (cherry picked from commit c0270cb30a582ab2b5cbc8442054ce0c2a766c15) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/26d179f8 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/26d179f8 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/26d179f8 Branch: refs/heads/branch-0.8 Commit: 26d179f8c74e6ddfdfd3f4aaaa357c6e9ed2eb81 Parents: c797d6e Author: Siddharth Seth <[email protected]> Authored: Sun Feb 5 18:24:15 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Sun Feb 5 18:25:23 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/dag/api/HistoryLogLevel.java | 1 + .../apache/tez/dag/api/TezConfiguration.java | 12 +- .../org/apache/tez/common/TezUtilsInternal.java | 19 ++++ .../tez/dag/history/HistoryEventHandler.java | 114 ++++++++++++++++--- .../tez/dag/history/HistoryEventType.java | 4 +- .../dag/history/TestHistoryEventHandler.java | 79 ++++++++++--- 7 files changed, 198 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6c1e7c1..55fdfc9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels TEZ-3582. Exception swallowed in PipelinedSorter causing incorrect results. TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics TEZ-3574. Container reuse won't pickup extra dag level local resource. http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java b/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java index 5eb4785..96d74f9 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java @@ -34,6 +34,7 @@ public enum HistoryLogLevel { DAG, VERTEX, TASK, + TASK_ATTEMPT, ALL; public static final HistoryLogLevel DEFAULT = ALL; http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 62d9c9a..6144399 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -20,7 +20,6 @@ package org.apache.tez.dag.api; import java.lang.reflect.Field; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -1244,6 +1243,17 @@ public class TezConfiguration extends Configuration { TEZ_PREFIX + "history.logging.log.level"; /** + * List of comma separated enum values. Specifies the list of task attempt termination causes, + * which have to be suppressed from being logged to ATS. The valid filters are defined in the + * enum TaskAttemptTerminationCause. The filters are applied only if tez.history.logging.log.level + * is set to TASK_ATTEMPT. + */ + @ConfigurationScope(Scope.DAG) + @ConfigurationProperty + public static final String TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS = + TEZ_PREFIX + "history.logging.taskattempt-filters"; + + /** * Comma separated list of Integers. These are the values that were set for the config value * for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older values are required so * that the groupIds generated previously will continue to be generated by the plugin. If an older http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index b8c05e7..5f90875 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -26,8 +26,10 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.charset.Charset; import java.util.BitSet; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -42,6 +44,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.log4j.Appender; import org.apache.tez.dag.api.DagTypeConverters; @@ -338,6 +341,22 @@ public class TezUtilsInternal { } } + public static <T extends Enum<T>> Set<T> getEnums(Configuration conf, String confName, + Class<T> enumType, String defaultValues) { + String[] names = conf.getStrings(confName); + if (names == null) { + names = StringUtils.getStrings(defaultValues); + } + if (names == null) { + return null; + } + Set<T> enums = new HashSet<>(); + for (String name : names) { + enums.add(Enum.valueOf(enumType, name)); + } + return enums; + } + @Private public static void setHadoopCallerContext(HadoopShim hadoopShim, TezTaskAttemptID attemptID) { hadoopShim.setHadoopCallerContext("tez_ta:" + attemptID.toString()); http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java index 042d022..79d1fc3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java @@ -20,6 +20,7 @@ package org.apache.tez.dag.history; import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; @@ -27,13 +28,18 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.HistoryLogLevel; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; import org.apache.tez.dag.history.logging.HistoryLoggingService; import org.apache.tez.dag.history.recovery.RecoveryService; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; public class HistoryEventHandler extends CompositeService { @@ -45,8 +51,13 @@ public class HistoryEventHandler extends CompositeService { private HistoryLoggingService historyLoggingService; private HistoryLogLevel amHistoryLogLevel; - private Map<TezDAGID, HistoryLogLevel> dagIdToLogLevel = - new ConcurrentHashMap<TezDAGID, HistoryLogLevel>(); + private final Map<TezDAGID, HistoryLogLevel> dagIdToLogLevel = new ConcurrentHashMap<>(); + private Set<TaskAttemptTerminationCause> amTaskAttemptFilters; + private final Map<TezDAGID, Set<TaskAttemptTerminationCause>> dagIdToTaskAttemptFilters = + new ConcurrentHashMap<>(); + + private final ConcurrentHashMap<TezTaskAttemptID, DAGHistoryEvent> suppressedEvents = + new ConcurrentHashMap<>(); public HistoryEventHandler(AppContext context) { super(HistoryEventHandler.class.getName()); @@ -80,6 +91,11 @@ public class HistoryEventHandler extends CompositeService { } amHistoryLogLevel = HistoryLogLevel.getLogLevel(context.getAMConf(), HistoryLogLevel.DEFAULT); + amTaskAttemptFilters = TezUtilsInternal.getEnums( + context.getAMConf(), + TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS, + TaskAttemptTerminationCause.class, + null); super.serviceInit(conf); } @@ -108,15 +124,20 @@ public class HistoryEventHandler extends CompositeService { if(dagId != null) { dagIdStr = dagId.toString(); } + HistoryEvent historyEvent = event.getHistoryEvent(); if (LOG.isDebugEnabled()) { LOG.debug("Handling history event" - + ", eventType=" + event.getHistoryEvent().getEventType()); + + ", eventType=" + historyEvent.getEventType()); } - if (recoveryEnabled && event.getHistoryEvent().isRecoveryEvent()) { + if (recoveryEnabled && historyEvent.isRecoveryEvent()) { recoveryService.handle(event); } - if (event.getHistoryEvent().isHistoryEvent() && shouldLogEvent(event)) { + if (historyEvent.isHistoryEvent() && shouldLogEvent(event)) { + DAGHistoryEvent suppressedEvent = getSupressedEvent(historyEvent); + if (suppressedEvent != null) { + historyLoggingService.handle(suppressedEvent); + } historyLoggingService.handle(event); } @@ -140,23 +161,86 @@ public class HistoryEventHandler extends CompositeService { } HistoryEvent historyEvent = event.getHistoryEvent(); - if (historyEvent.getEventType() == HistoryEventType.DAG_SUBMITTED) { - dagLogLevel = HistoryLogLevel.getLogLevel(((DAGSubmittedEvent)historyEvent).getConf(), - amHistoryLogLevel); + HistoryEventType eventType = historyEvent.getEventType(); + if (eventType == HistoryEventType.DAG_SUBMITTED) { + Configuration dagConf = ((DAGSubmittedEvent)historyEvent).getConf(); + dagLogLevel = HistoryLogLevel.getLogLevel(dagConf, amHistoryLogLevel); dagIdToLogLevel.put(dagId, dagLogLevel); - } else if (historyEvent.getEventType() == HistoryEventType.DAG_RECOVERED) { + maybeUpdateDagTaskAttemptFilters(dagId, dagLogLevel, dagConf); + } else if (eventType == HistoryEventType.DAG_RECOVERED) { if (context.getCurrentDAG() != null) { - dagLogLevel = HistoryLogLevel.getLogLevel(context.getCurrentDAG().getConf(), - amHistoryLogLevel); + Configuration dagConf = context.getCurrentDAG().getConf(); + dagLogLevel = HistoryLogLevel.getLogLevel(dagConf, amHistoryLogLevel); dagIdToLogLevel.put(dagId, dagLogLevel); + maybeUpdateDagTaskAttemptFilters(dagId, dagLogLevel, dagConf); + } + } else if (eventType == HistoryEventType.DAG_FINISHED) { + dagIdToLogLevel.remove(dagId); + dagIdToTaskAttemptFilters.remove(dagId); + suppressedEvents.clear(); + } + + if (dagLogLevel.shouldLog(historyEvent.getEventType().getHistoryLogLevel())) { + return shouldLogTaskAttemptEvents(event, dagLogLevel); + } + return false; + } + + // If the log level is set to TASK_ATTEMPT and filters are configured, then we should suppress + // the start event and publish it only when TaskAttemptFinishedEvent is received after + // matching against the filter. + // Note: if the AM is killed before we get the TaskAttemptFinishedEvent, we'll lose this event. + private boolean shouldLogTaskAttemptEvents(DAGHistoryEvent event, HistoryLogLevel dagLogLevel) { + HistoryEvent historyEvent = event.getHistoryEvent(); + HistoryEventType eventType = historyEvent.getEventType(); + if (dagLogLevel == HistoryLogLevel.TASK_ATTEMPT && + (eventType == HistoryEventType.TASK_ATTEMPT_STARTED || + eventType == HistoryEventType.TASK_ATTEMPT_FINISHED)) { + TezDAGID dagId = event.getDagID(); + Set<TaskAttemptTerminationCause> filters = null; + if (dagId != null) { + filters = dagIdToTaskAttemptFilters.get(dagId); + } + if (filters == null) { + filters = amTaskAttemptFilters; + } + if (filters == null) { + return true; } - } else if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) { - if (dagIdToLogLevel.containsKey(dagId)) { - dagIdToLogLevel.remove(dagId); + if (eventType == HistoryEventType.TASK_ATTEMPT_STARTED) { + suppressedEvents.put(((TaskAttemptStartedEvent)historyEvent).getTaskAttemptID(), event); + return false; + } else { // TaskAttemptFinishedEvent + TaskAttemptFinishedEvent finishedEvent = (TaskAttemptFinishedEvent)historyEvent; + if (filters.contains(finishedEvent.getTaskAttemptError())) { + suppressedEvents.remove(finishedEvent.getTaskAttemptID()); + return false; + } } } + return true; + } - return dagLogLevel.shouldLog(historyEvent.getEventType().getHistoryLogLevel()); + private void maybeUpdateDagTaskAttemptFilters(TezDAGID dagId, HistoryLogLevel dagLogLevel, + Configuration dagConf) { + if (dagLogLevel == HistoryLogLevel.TASK_ATTEMPT) { + Set<TaskAttemptTerminationCause> filters = TezUtilsInternal.getEnums( + dagConf, + TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS, + TaskAttemptTerminationCause.class, + null); + if (filters != null) { + dagIdToTaskAttemptFilters.put(dagId, filters); + } + } + } + + private DAGHistoryEvent getSupressedEvent(HistoryEvent historyEvent) { + if (historyEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) { + TaskAttemptFinishedEvent finishedEvent = (TaskAttemptFinishedEvent)historyEvent; + return suppressedEvents.remove(finishedEvent.getTaskAttemptID()); + } + return null; } public void handle(DAGHistoryEvent event) { http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java index a41d0e6..a536fdf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java @@ -36,8 +36,8 @@ public enum HistoryEventType { VERTEX_FINISHED(HistoryLogLevel.VERTEX), TASK_STARTED(HistoryLogLevel.TASK), TASK_FINISHED(HistoryLogLevel.TASK), - TASK_ATTEMPT_STARTED(HistoryLogLevel.ALL), - TASK_ATTEMPT_FINISHED(HistoryLogLevel.ALL), + TASK_ATTEMPT_STARTED(HistoryLogLevel.TASK_ATTEMPT), + TASK_ATTEMPT_FINISHED(HistoryLogLevel.TASK_ATTEMPT), CONTAINER_LAUNCHED(HistoryLogLevel.ALL), CONTAINER_STOPPED(HistoryLogLevel.ALL), DAG_COMMIT_STARTED(HistoryLogLevel.DAG), http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java index c8a076d..4c0fe3f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java @@ -32,23 +32,29 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.dag.api.HistoryLogLevel; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.history.events.AMStartedEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerStoppedEvent; import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.logging.HistoryLoggingService; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.hadoop.shim.HadoopShim; +import org.junit.Before; import org.junit.Test; public class TestHistoryEventHandler { @@ -56,42 +62,69 @@ public class TestHistoryEventHandler { private static ApplicationId appId = ApplicationId.newInstance(1000l, 1); private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); private static String user = "TEST_USER"; + private Configuration baseConfig; + + @Before + public void setupConfig() { + baseConfig = new Configuration(false); + } @Test public void testAll() { - testLogLevel(null, 6); + testLogLevel(null, 11); testLogLevel(HistoryLogLevel.NONE, 0); testLogLevel(HistoryLogLevel.AM, 1); testLogLevel(HistoryLogLevel.DAG, 3); testLogLevel(HistoryLogLevel.VERTEX, 4); testLogLevel(HistoryLogLevel.TASK, 5); - testLogLevel(HistoryLogLevel.ALL, 6); + testLogLevel(HistoryLogLevel.TASK_ATTEMPT, 9); + testLogLevel(HistoryLogLevel.ALL, 11); + } + + @Test + public void testTaskAttemptFilters() { + baseConfig.set(TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS, + "EXTERNAL_PREEMPTION,INTERRUPTED_BY_USER"); + testLogLevel(HistoryLogLevel.TASK_ATTEMPT, 5); + testLogLevelWithRecovery(HistoryLogLevel.TASK_ATTEMPT, 5); + + baseConfig.set(TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS, + "EXTERNAL_PREEMPTION"); + testLogLevel(HistoryLogLevel.TASK_ATTEMPT, 7); + testLogLevelWithRecovery(HistoryLogLevel.TASK_ATTEMPT, 7); + + baseConfig.set(TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS, "INTERNAL_PREEMPTION"); + testLogLevel(HistoryLogLevel.TASK_ATTEMPT, 9); + testLogLevelWithRecovery(HistoryLogLevel.TASK_ATTEMPT, 9); } @Test public void testWithDAGRecovery() { - testLogLevelWithRecovery(null, 6); + testLogLevelWithRecovery(null, 11); testLogLevelWithRecovery(HistoryLogLevel.AM, 1); testLogLevelWithRecovery(HistoryLogLevel.DAG, 3); testLogLevelWithRecovery(HistoryLogLevel.VERTEX, 4); testLogLevelWithRecovery(HistoryLogLevel.TASK, 5); - testLogLevelWithRecovery(HistoryLogLevel.ALL, 6); + testLogLevelWithRecovery(HistoryLogLevel.TASK_ATTEMPT, 9); + testLogLevelWithRecovery(HistoryLogLevel.ALL, 11); } @Test public void testMultipleDag() { - testLogLevel(null, HistoryLogLevel.NONE, 7); - testLogLevel(null, HistoryLogLevel.AM, 7); - testLogLevel(null, HistoryLogLevel.DAG, 9); - testLogLevel(null, HistoryLogLevel.VERTEX, 10); - testLogLevel(null, HistoryLogLevel.TASK, 11); - testLogLevel(null, HistoryLogLevel.ALL, 12); + testLogLevel(null, HistoryLogLevel.NONE, 14); + testLogLevel(null, HistoryLogLevel.AM, 14); + testLogLevel(null, HistoryLogLevel.DAG, 16); + testLogLevel(null, HistoryLogLevel.VERTEX, 17); + testLogLevel(null, HistoryLogLevel.TASK, 18); + testLogLevel(null, HistoryLogLevel.TASK_ATTEMPT, 22); + testLogLevel(null, HistoryLogLevel.ALL, 22); testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.NONE, 5); testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.AM, 5); testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.DAG, 7); testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.VERTEX, 8); testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.TASK, 9); - testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.ALL, 10); + testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.TASK_ATTEMPT, 13); + testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.ALL, 13); testLogLevel(HistoryLogLevel.NONE, HistoryLogLevel.NONE, 0); } @@ -153,7 +186,7 @@ public class TestHistoryEventHandler { } private HistoryEventHandler createHandler(HistoryLogLevel logLevel) { - Configuration conf = new Configuration(false); + Configuration conf = new Configuration(baseConfig); conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false); conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, InMemoryHistoryLoggingService.class.getName()); @@ -181,6 +214,7 @@ public class TestHistoryEventHandler { long time = System.currentTimeMillis(); Configuration conf = new Configuration(inConf); + historyEvents.add(new DAGHistoryEvent(null, new AMStartedEvent(attemptId, time, user))); historyEvents.add(new DAGHistoryEvent(dagId, @@ -189,16 +223,33 @@ public class TestHistoryEventHandler { TezVertexID vertexID = TezVertexID.getInstance(dagId, 1); historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time))); + ContainerId containerId = ContainerId.newContainerId(attemptId, dagId.getId()); TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1); historyEvents.add(new DAGHistoryEvent(dagId, new TaskStartedEvent(tezTaskID, "test", time, time))); + historyEvents.add( + new DAGHistoryEvent(new ContainerLaunchedEvent(containerId, time, attemptId))); historyEvents.add(new DAGHistoryEvent(dagId, new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time, - ContainerId.newContainerId(attemptId, 1), NodeId.newInstance("localhost", 8765), null, - null, null))); + containerId, NodeId.newInstance("localhost", 8765), null, null, null))); + historyEvents.add(new DAGHistoryEvent(dagId, + new TaskAttemptFinishedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time, + time + 1, TaskAttemptState.KILLED, null, + TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, "", null, null, null, time, null, time, + containerId, NodeId.newInstance("localhost", 8765), null, null, null))); + historyEvents.add(new DAGHistoryEvent(dagId, + new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 2), "test", time, + containerId, NodeId.newInstance("localhost", 8765), null, null, null))); + historyEvents.add(new DAGHistoryEvent(dagId, + new TaskAttemptFinishedEvent(TezTaskAttemptID.getInstance(tezTaskID, 2), "test", time + 2, + time + 3, TaskAttemptState.KILLED, null, + TaskAttemptTerminationCause.INTERRUPTED_BY_USER, "", null, null, null, time, null, + time + 2, containerId, NodeId.newInstance("localhost", 8765), null, null, null))); historyEvents.add(new DAGHistoryEvent(dagId, new DAGFinishedEvent(dagId, time, time, DAGState.SUCCEEDED, null, null, user, "test", null, attemptId, DAGPlan.getDefaultInstance()))); + historyEvents.add( + new DAGHistoryEvent(new ContainerStoppedEvent(containerId, time + 4, 0, attemptId))); return historyEvents; } }
