TEZ-2581. Umbrella for Tez Recovery Redesign (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/28f30b0e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/28f30b0e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/28f30b0e Branch: refs/heads/master Commit: 28f30b0ef654f124713acb7a213e37bbc7d8b486 Parents: c4487f9 Author: Jeff Zhang <[email protected]> Authored: Wed Nov 25 22:01:44 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Wed Nov 25 22:01:44 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/TezConfiguration.java | 8 + .../apache/tez/dag/api/event/VertexState.java | 7 +- .../records/TaskAttemptTerminationCause.java | 1 + .../tez/dag/api/client/VertexStatusBuilder.java | 2 - .../java/org/apache/tez/dag/app/AppContext.java | 4 + .../org/apache/tez/dag/app/DAGAppMaster.java | 38 +- .../org/apache/tez/dag/app/RecoveryParser.java | 721 ++++++--- .../tez/dag/app/TaskCommunicatorManager.java | 79 +- .../java/org/apache/tez/dag/app/dag/DAG.java | 3 - .../java/org/apache/tez/dag/app/dag/Task.java | 3 - .../org/apache/tez/dag/app/dag/TaskAttempt.java | 3 - .../java/org/apache/tez/dag/app/dag/Vertex.java | 3 - .../org/apache/tez/dag/app/dag/VertexState.java | 1 - .../dag/app/dag/event/DAGEventRecoverEvent.java | 23 +- .../tez/dag/app/dag/event/RecoveryEvent.java | 23 + .../event/TaskAttemptEventAttemptFailed.java | 15 +- .../event/TaskAttemptEventAttemptKilled.java | 16 +- .../dag/event/TaskAttemptEventKillRequest.java | 14 +- .../event/TaskAttemptEventStartedRemotely.java | 14 +- .../event/TaskAttemptEventTezEventUpdate.java | 37 + .../dag/app/dag/event/TaskAttemptEventType.java | 4 +- .../dag/app/dag/event/TaskEventRecoverTask.java | 53 - .../app/dag/event/TaskEventScheduleTask.java | 14 +- .../dag/app/dag/event/TaskEventTermination.java | 16 +- .../tez/dag/app/dag/event/TaskEventType.java | 3 - .../app/dag/event/VertexEventRecoverVertex.java | 1 - .../app/dag/event/VertexEventRouteEvent.java | 12 - .../event/VertexEventSourceVertexRecovered.java | 62 - .../tez/dag/app/dag/event/VertexEventType.java | 3 - .../apache/tez/dag/app/dag/impl/DAGImpl.java | 364 ++--- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 273 ++-- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 384 ++--- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 1426 ++++------------ .../tez/dag/history/HistoryEventHandler.java | 5 +- .../tez/dag/history/HistoryEventType.java | 3 +- .../tez/dag/history/RecoveryConverters.java | 27 + .../events/TaskAttemptFinishedEvent.java | 26 +- .../events/VertexConfigurationDoneEvent.java | 211 +++ .../events/VertexGroupCommitFinishedEvent.java | 26 +- .../events/VertexGroupCommitStartedEvent.java | 25 +- .../history/events/VertexInitializedEvent.java | 35 +- .../events/VertexParallelismUpdatedEvent.java | 204 --- .../VertexRecoverableEventsGeneratedEvent.java | 224 --- .../impl/HistoryEventJsonConversion.java | 81 +- .../dag/history/recovery/RecoveryService.java | 10 +- .../tez/dag/history/utils/TezEventUtils.java | 131 ++ tez-dag/src/main/proto/HistoryEvents.proto | 31 +- .../dag/api/client/TestVertexStatusBuilder.java | 7 +- .../apache/tez/dag/app/TestRecoveryParser.java | 480 +++++- .../dag/app/TestTaskCommunicatorManager1.java | 56 +- .../tez/dag/app/dag/impl/TestDAGRecovery.java | 1527 ++++++++++++------ .../tez/dag/app/dag/impl/TestTaskAttempt.java | 5 +- .../app/dag/impl/TestTaskAttemptRecovery.java | 327 ---- .../tez/dag/app/dag/impl/TestTaskImpl.java | 5 +- .../tez/dag/app/dag/impl/TestTaskRecovery.java | 873 ---------- .../tez/dag/app/dag/impl/TestVertexImpl.java | 33 +- .../dag/app/dag/impl/TestVertexRecovery.java | 1340 --------------- .../TestHistoryEventsProtoConversion.java | 220 ++- .../impl/TestHistoryEventJsonConversion.java | 27 +- .../org/apache/tez/examples/TezExampleBase.java | 12 + .../ats/HistoryEventTimelineConversion.java | 18 +- .../ats/TestHistoryEventTimelineConversion.java | 29 +- .../apache/tez/runtime/api/impl/TezEvent.java | 2 +- .../apache/tez/test/AMShutdownController.java | 57 + .../RecoveryServiceWithEventHandlingHook.java | 386 +++++ .../org/apache/tez/test/TestDAGRecovery.java | 62 - .../java/org/apache/tez/test/TestRecovery.java | 484 ++++++ .../java/org/apache/tez/test/TestTezJobs.java | 6 +- .../apache/tez/test/dag/MultiAttemptDAG.java | 2 +- 70 files changed, 4639 insertions(+), 5989 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 59847ef..b9a91f8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-2581. Umbrella for Tez Recovery Redesign TEZ-2956. Handle auto-reduce parallelism when the totalNumBipartiteSourceTasks is 0 TEZ-2947. Tez UI: Timeline, RM & AM requests gets into a consecutive loop in counters page without any delay http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 0ea8999..fabc256 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1493,4 +1493,12 @@ public class TezConfiguration extends Configuration { @ConfigurationProperty(type="boolean") public static final String TEZ_CLIENT_ASYNCHRONOUS_STOP = TEZ_PREFIX + "client.asynchronous-stop"; public static final boolean TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT = true; + + // for Recovery Test + @Private + @ConfigurationScope(Scope.TEST) + public static final String TEZ_AM_RECOVERY_SERVICE_CLASS = + TEZ_PREFIX + "test.recovery-service-class"; + @Private + public static final String TEZ_AM_RECOVERY_SERVICE_CLASS_DEFAULT = "org.apache.tez.dag.history.recovery.RecoveryService"; } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java index c9c2d58..86e70a1 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java @@ -56,5 +56,10 @@ public enum VertexState { * further. Listeners can depend on the vertex's configured state after * receiving this notification. */ - CONFIGURED + CONFIGURED, + + /** + * Indicates that the Vertex move to INITIALIZING + */ + INITIALIZING } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java index a5214fb..14eaa3a 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java @@ -23,6 +23,7 @@ public enum TaskAttemptTerminationCause { TERMINATED_BY_CLIENT, // Killed by client command TERMINATED_AT_SHUTDOWN, // Killed due execution shutdown + TERMINATED_AT_RECOVERY, // Killed in recovery, due to can not recover running task attempt INTERNAL_PREEMPTION, // Killed by Tez to makes space for higher pri work EXTERNAL_PREEMPTION, // Killed by the cluster to make space for other work TERMINATED_INEFFECTIVE_SPECULATION, // Killed speculative attempt because original succeeded http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java index ada3490..4de321c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java @@ -65,8 +65,6 @@ public class VertexStatusBuilder extends VertexStatus { return VertexStatusStateProto.VERTEX_NEW; case INITIALIZING: return VertexStatusStateProto.VERTEX_INITIALIZING; - case RECOVERING: - return VertexStatusStateProto.VERTEX_NEW; case INITED: return VertexStatusStateProto.VERTEX_INITED; case RUNNING: http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index 68453b1..30716da 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; +import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.rm.TaskSchedulerManager; import org.apache.tez.dag.app.rm.container.AMContainerMap; @@ -76,6 +77,8 @@ public interface AppContext { void setDAG(DAG dag); + void setDAGRecoveryData(DAGRecoveryData dagRecoveryData); + Set<String> getAllDAGIDs(); @SuppressWarnings("rawtypes") @@ -126,4 +129,5 @@ public interface AppContext { public HadoopShim getHadoopShim(); + public DAGRecoveryData getDAGRecoveryData(); } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 2c50264..23981e7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -127,7 +127,7 @@ import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; -import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData; +import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.Task; @@ -151,6 +151,10 @@ import org.apache.tez.dag.app.dag.event.VertexEvent; import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.DAGImpl; import org.apache.tez.dag.app.launcher.ContainerLauncherManager; +import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl; +import org.apache.tez.dag.app.dag.impl.TaskImpl; +import org.apache.tez.dag.app.dag.impl.VertexImpl; +import org.apache.tez.dag.app.launcher.LocalContainerLauncher; import org.apache.tez.dag.app.rm.AMSchedulerEventType; import org.apache.tez.dag.app.rm.ContainerLauncherEventType; import org.apache.tez.dag.app.rm.TaskSchedulerManager; @@ -1417,6 +1421,7 @@ public class DAGAppMaster extends AbstractService { private class RunningAppContext implements AppContext { private DAG dag; + private DAGRecoveryData dagRecoveryData; private final Configuration conf; private final ClusterInfo clusterInfo = new ClusterInfo(); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -1633,6 +1638,7 @@ public class DAGAppMaster extends AbstractService { try { wLock.lock(); this.dag = dag; + this.dagRecoveryData = null; } finally { wLock.unlock(); } @@ -1647,6 +1653,16 @@ public class DAGAppMaster extends AbstractService { public long getCumulativeGCTime() { return getAMGCTime(); } + + @Override + public void setDAGRecoveryData(DAGRecoveryData dagRecoveryData) { + this.dagRecoveryData = dagRecoveryData; + } + + @Override + public DAGRecoveryData getDAGRecoveryData() { + return dagRecoveryData; + } } private static class ServiceWithDependency implements ServiceStateChangeListener { @@ -1818,7 +1834,7 @@ public class DAGAppMaster extends AbstractService { } } - private RecoveredDAGData recoverDAG() throws IOException, TezException { + private DAGRecoveryData recoverDAG() throws IOException, TezException { if (recoveryEnabled) { if (this.appAttemptID.getAttemptId() > 1) { LOG.info("Recovering data from previous attempts" @@ -1826,7 +1842,7 @@ public class DAGAppMaster extends AbstractService { this.state = DAGAppMasterState.RECOVERING; RecoveryParser recoveryParser = new RecoveryParser( this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId()); - RecoveredDAGData recoveredDAGData = recoveryParser.parseRecoveryData(); + DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData(); return recoveredDAGData; } } @@ -1855,7 +1871,7 @@ public class DAGAppMaster extends AbstractService { this.lastDAGCompletionTime = clock.getTime(); - RecoveredDAGData recoveredDAGData; + DAGRecoveryData recoveredDAGData; try { recoveredDAGData = recoverDAG(); } catch (IOException e) { @@ -1875,9 +1891,8 @@ public class DAGAppMaster extends AbstractService { } if (recoveredDAGData != null) { - List<URL> classpathUrls = null; if (recoveredDAGData.cumulativeAdditionalResources != null) { - classpathUrls = processAdditionalResources(recoveredDAGData.cumulativeAdditionalResources); + recoveredDAGData.additionalUrlsForClasspath = processAdditionalResources(recoveredDAGData.cumulativeAdditionalResources); amResources.putAll(recoveredDAGData.cumulativeAdditionalResources); cumulativeAdditionalResources.putAll(recoveredDAGData.cumulativeAdditionalResources); } @@ -1900,9 +1915,11 @@ public class DAGAppMaster extends AbstractService { + ", failureReason=" + recoveredDAGData.reason); _updateLoggers(recoveredDAGData.recoveredDAG, ""); if (recoveredDAGData.nonRecoverable) { + addDiagnostic("DAG " + recoveredDAGData.recoveredDagID + " can not be recovered due to " + + recoveredDAGData.reason); DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(), - DAGState.FAILED, classpathUrls); + DAGState.FAILED, recoveredDAGData); DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID, recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(), recoveredDAGData.recoveredDAG.getUserName(), @@ -1919,7 +1936,7 @@ public class DAGAppMaster extends AbstractService { } else { DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(), - recoveredDAGData.dagState, classpathUrls); + recoveredDAGData.dagState, recoveredDAGData); DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID, recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(), recoveredDAGData.recoveredDAG.getUserName(), this.clock.getTime(), @@ -1938,7 +1955,7 @@ public class DAGAppMaster extends AbstractService { this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(), dagRecoveredEvent)); DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent( - recoveredDAGData.recoveredDAG.getID(), classpathUrls); + recoveredDAGData.recoveredDAG.getID(), recoveredDAGData); dagEventDispatcher.handle(recoverDAGEvent); this.state = DAGAppMasterState.RUNNING; } @@ -2050,7 +2067,6 @@ public class DAGAppMaster extends AbstractService { if (dag == null || eventDagIndex != dag.getID().getId()) { return; // event not relevant any more } - Task task = dag.getVertex(event.getTaskID().getVertexID()). getTask(event.getTaskID()); @@ -2432,7 +2448,6 @@ public class DAGAppMaster extends AbstractService { TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE_DEFAULT); } - @VisibleForTesting static void parseAllPlugins( List<NamedEntityDescriptor> taskSchedulerDescriptors, BiMap<String, Integer> taskSchedulerPluginMap, @@ -2547,4 +2562,5 @@ public class DAGAppMaster extends AbstractService { } return sb.toString(); } + } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java index 046dbd9..368dd17 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java @@ -20,6 +20,7 @@ package org.apache.tez.dag.app; import java.io.EOFException; import java.io.IOException; +import java.net.URL; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -33,13 +34,14 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.dag.DAGState; -import org.apache.tez.dag.app.dag.Task; -import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.app.dag.impl.DAGImpl; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -58,21 +60,29 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; import org.apache.tez.dag.history.events.TaskFinishedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.history.events.VertexCommitStartedEvent; -import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; import org.apache.tez.dag.history.events.VertexInitializedEvent; -import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; +import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.recovery.RecoveryService; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos; import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto; +import org.apache.tez.runtime.api.impl.TezEvent; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * RecoverParser is mainly for Tez AM Recovery. It would read the recovery events. (summary & non-summary) + * + */ public class RecoveryParser { private static final Logger LOG = LoggerFactory.getLogger(RecoveryParser.class); @@ -100,7 +110,8 @@ public class RecoveryParser { this.recoveryFS.mkdirs(currentAttemptRecoveryDataDir); } - public static class RecoveredDAGData { + public static class DAGRecoveryData { + public TezDAGID recoveredDagID = null; public DAGImpl recoveredDAG = null; public DAGState dagState = null; @@ -109,6 +120,119 @@ public class RecoveryParser { public boolean isSessionStopped = false; public String reason = null; public Map<String, LocalResource> cumulativeAdditionalResources = null; + public List<URL> additionalUrlsForClasspath = null; + + public Map<TezVertexID, VertexRecoveryData> vertexRecoveryDataMap = + new HashMap<TezVertexID, RecoveryParser.VertexRecoveryData>(); + private DAGInitializedEvent dagInitedEvent; + private DAGStartedEvent dagStartedEvent; + private DAGFinishedEvent dagFinishedEvent; + + private Map<TezVertexID, Boolean> vertexCommitStatus = + new HashMap<TezVertexID, Boolean>(); + private Map<String, Boolean> vertexGroupCommitStatus = + new HashMap<String, Boolean>(); + private Map<TezVertexID, Boolean> vertexGroupMemberCommitStatus = + new HashMap<TezVertexID, Boolean>(); + + public DAGRecoveryData(DAGSummaryData dagSummaryData) { + if (dagSummaryData.completed) { + this.isCompleted = true; + this.dagState = dagSummaryData.dagState; + } + dagSummaryData.checkRecoverableSummary(); + this.nonRecoverable = dagSummaryData.nonRecoverable; + this.reason = dagSummaryData.reason; + this.vertexCommitStatus = dagSummaryData.vertexCommitStatus; + this.vertexGroupCommitStatus = dagSummaryData.vertexGroupCommitStatus; + this.vertexGroupMemberCommitStatus = dagSummaryData.vertexGroupMemberCommitStatus; + } + + // DAG is not recoverable if vertex has committer and has completed the commit (based on summary recovery events) + // but its full recovery events are not seen. (based on non-summary recovery events) + // Unrecoverable reason: vertex is committed we cannot rerun it and if vertex recovery events are not completed + // we cannot run other vertices that may depend on this one. So we have to abort. + public void checkRecoverableNonSummary() { + // It is OK without full recovering events if the dag is completed based on summary event. + if (isCompleted) { + return; + } + for (Map.Entry<TezVertexID, Boolean> entry : vertexCommitStatus.entrySet()) { + // vertex has finished committing + TezVertexID vertexId = entry.getKey(); + boolean commitFinished = entry.getValue(); + if(commitFinished + && (!vertexRecoveryDataMap.containsKey(vertexId) + || vertexRecoveryDataMap.get(vertexId).getVertexFinishedEvent() == null)) { + this.nonRecoverable = true; + this.reason = "Vertex has been committed, but its full recovery events are not seen, vertexId=" + + vertexId; + return; + } + } + for (Map.Entry<TezVertexID, Boolean> entry : vertexGroupMemberCommitStatus.entrySet()) { + // vertex has finished committing + TezVertexID vertexId = entry.getKey(); + boolean commitFinished = entry.getValue(); + if(commitFinished + && (!vertexRecoveryDataMap.containsKey(vertexId) + || vertexRecoveryDataMap.get(vertexId).getVertexFinishedEvent() == null)) { + this.nonRecoverable = true; + this.reason = "Vertex has been committed as member of vertex group" + + ", but its full recovery events are not seen, vertexId=" + vertexId; + return; + } + } + } + + public DAGInitializedEvent getDAGInitializedEvent() { + return dagInitedEvent; + } + + public DAGStartedEvent getDAGStartedEvent() { + return dagStartedEvent; + } + + public DAGFinishedEvent getDAGFinishedEvent() { + return dagFinishedEvent; + } + + public boolean isVertexGroupCommitted(String groupName) { + return vertexGroupCommitStatus.containsKey(groupName) + && vertexGroupCommitStatus.get(groupName); + } + + public VertexRecoveryData getVertexRecoveryData(TezVertexID vertexId) { + return vertexRecoveryDataMap.get(vertexId); + } + + public TaskRecoveryData getTaskRecoveryData(TezTaskID taskId) { + VertexRecoveryData vertexRecoveryData = getVertexRecoveryData(taskId.getVertexID()); + if (vertexRecoveryData != null) { + return vertexRecoveryData.taskRecoveryDataMap.get(taskId); + } else { + return null; + } + } + + public TaskAttemptRecoveryData getTaskAttemptRecoveryData(TezTaskAttemptID taId) { + TaskRecoveryData taskRecoveryData = getTaskRecoveryData(taId.getTaskID()); + if (taskRecoveryData != null) { + return taskRecoveryData.taRecoveryDataMap.get(taId); + } else { + return null; + } + } + + public VertexRecoveryData maybeCreateVertexRecoveryData(TezVertexID vertexId) { + VertexRecoveryData vRecoveryData = vertexRecoveryDataMap.get(vertexId); + if (vRecoveryData == null) { + vRecoveryData = new VertexRecoveryData(vertexCommitStatus.containsKey(vertexId) + ? vertexCommitStatus.get(vertexId) : false); + vertexRecoveryDataMap.put(vertexId, vRecoveryData); + } + return vRecoveryData; + } } private static void parseSummaryFile(FSDataInputStream inputStream) @@ -178,12 +302,12 @@ public class RecoveryParser { case VERTEX_INITIALIZED: event = new VertexInitializedEvent(); break; + case VERTEX_CONFIGURE_DONE: + event = new VertexConfigurationDoneEvent(); + break; case VERTEX_STARTED: event = new VertexStartedEvent(); break; - case VERTEX_PARALLELISM_UPDATED: - event = new VertexParallelismUpdatedEvent(); - break; case VERTEX_COMMIT_STARTED: event = new VertexCommitStartedEvent(); break; @@ -208,18 +332,11 @@ public class RecoveryParser { case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(); break; - case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: - event = new VertexRecoverableEventsGeneratedEvent(); - break; default: throw new IOException("Invalid data found, unknown event type " + eventType); } - if (LOG.isDebugEnabled()) { - LOG.debug("Parsing event from input stream" - + ", eventType=" + eventType); - } try { event.fromProtoStream(inputStream); } catch (EOFException eof) { @@ -233,10 +350,6 @@ public class RecoveryParser { return event; } - - - - public static List<HistoryEvent> parseDAGRecoveryFile(FSDataInputStream inputStream) throws IOException { List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>(); @@ -246,11 +359,45 @@ public class RecoveryParser { LOG.info("Reached end of stream"); break; } + LOG.debug("Read HistoryEvent, eventType=" + historyEvent.getEventType() + ", event=" + historyEvent); historyEvents.add(historyEvent); } return historyEvents; } + public static List<HistoryEvent> readRecoveryEvents(TezConfiguration tezConf, ApplicationId appId, + int attempt) throws IOException { + Path tezSystemStagingDir = + TezCommonUtils.getTezSystemStagingPath(tezConf, appId.toString()); + Path recoveryDataDir = + TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf); + FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf); + List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>(); + for (int i=1; i <= attempt; ++i) { + Path currentAttemptRecoveryDataDir = + TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i); + Path recoveryFilePath = + new Path(currentAttemptRecoveryDataDir, appId.toString().replace( + "application", "dag") + + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); + if (fs.exists(recoveryFilePath)) { + LOG.info("Read recovery file:" + recoveryFilePath); + FSDataInputStream in = null; + try { + in = fs.open(recoveryFilePath); + historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(in)); + } catch (IOException e) { + throw e; + } finally { + if (in != null) { + in.close(); + } + } + } + } + return historyEvents; + } + public static void main(String argv[]) throws IOException { // TODO clean up with better usage and error handling Configuration conf = new Configuration(); @@ -325,13 +472,15 @@ public class RecoveryParser { final TezDAGID dagId; boolean completed = false; boolean dagCommitCompleted = true; + boolean nonRecoverable = false; + String reason; DAGState dagState; - Map<TezVertexID, Boolean> vertexCommitStatus = + public Map<TezVertexID, Boolean> vertexCommitStatus = new HashMap<TezVertexID, Boolean>(); - Map<String, Boolean> vertexGroupCommitStatus = + public Map<String, Boolean> vertexGroupCommitStatus = new HashMap<String, Boolean>(); - List<HistoryEvent> bufferedSummaryEvents = - new ArrayList<HistoryEvent>(); + public Map<TezVertexID, Boolean> vertexGroupMemberCommitStatus = + new HashMap<TezVertexID, Boolean>(); DAGSummaryData(TezDAGID dagId) { this.dagId = dagId; @@ -356,7 +505,6 @@ public class RecoveryParser { case DAG_KILL_REQUEST: DAGKillRequestEvent killRequestEvent = new DAGKillRequestEvent(); killRequestEvent.fromSummaryProtoStream(proto); - bufferedSummaryEvents.add(killRequestEvent); break; case DAG_COMMIT_STARTED: dagCommitCompleted = false; @@ -375,24 +523,27 @@ public class RecoveryParser { if (vertexCommitStatus.containsKey(vertexFinishedEvent.getVertexID())) { vertexCommitStatus.put( vertexFinishedEvent.getVertexID(), true); - bufferedSummaryEvents.add(vertexFinishedEvent); } break; case VERTEX_GROUP_COMMIT_STARTED: VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent = new VertexGroupCommitStartedEvent(); vertexGroupCommitStartedEvent.fromSummaryProtoStream(proto); - bufferedSummaryEvents.add(vertexGroupCommitStartedEvent); vertexGroupCommitStatus.put( vertexGroupCommitStartedEvent.getVertexGroupName(), false); + for (TezVertexID member : vertexGroupCommitStartedEvent.getVertexIds()) { + vertexGroupMemberCommitStatus.put(member, false); + } break; case VERTEX_GROUP_COMMIT_FINISHED: VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent = new VertexGroupCommitFinishedEvent(); vertexGroupCommitFinishedEvent.fromSummaryProtoStream(proto); - bufferedSummaryEvents.add(vertexGroupCommitFinishedEvent); vertexGroupCommitStatus.put( vertexGroupCommitFinishedEvent.getVertexGroupName(), true); + for (TezVertexID member : vertexGroupCommitFinishedEvent.getVertexIds()) { + vertexGroupMemberCommitStatus.put(member, true); + } break; default: String message = "Found invalid summary event that was not handled" @@ -401,6 +552,37 @@ public class RecoveryParser { } } + // Check whether DAG is recoverable based on DAGSummaryData + // 1. Whether vertex is in the middle of committing + // 2. Whether vertex group is in the middle of committing + private void checkRecoverableSummary() { + if (!dagCommitCompleted) { + this.nonRecoverable = true; + this.reason = "DAG Commit was in progress, not recoverable" + + ", dagId=" + dagId; + } + if (!vertexCommitStatus.isEmpty()) { + for (Entry<TezVertexID, Boolean> entry : vertexCommitStatus.entrySet()) { + if (!(entry.getValue().booleanValue())) { + this.nonRecoverable = true; + this.reason = "Vertex Commit was in progress, not recoverable" + + ", dagId=" + dagId + + ", vertexId=" + entry.getKey(); + } + } + } + if (!vertexGroupCommitStatus.isEmpty()) { + for (Entry<String, Boolean> entry : vertexGroupCommitStatus.entrySet()) { + if (!(entry.getValue().booleanValue())) { + this.nonRecoverable = true; + this.reason = "Vertex Group Commit was in progress, not recoverable" + + ", dagId=" + dagId + + ", vertexGroup=" + entry.getKey(); + } + } + } + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -426,32 +608,6 @@ public class RecoveryParser { } } - private String isDAGRecoverable(DAGSummaryData data) { - if (!data.dagCommitCompleted) { - return "DAG Commit was in progress, not recoverable" - + ", dagId=" + data.dagId; - } - if (!data.vertexCommitStatus.isEmpty()) { - for (Entry<TezVertexID, Boolean> entry : data.vertexCommitStatus.entrySet()) { - if (!(entry.getValue().booleanValue())) { - return "Vertex Commit was in progress, not recoverable" - + ", dagId=" + data.dagId - + ", vertexId=" + entry.getKey(); - } - } - } - if (!data.vertexGroupCommitStatus.isEmpty()) { - for (Entry<String, Boolean> entry : data.vertexGroupCommitStatus.entrySet()) { - if (!(entry.getValue().booleanValue())) { - return "Vertex Group Commit was in progress, not recoverable" - + ", dagId=" + data.dagId - + ", vertexGroup=" + entry.getKey(); - } - } - } - return null; - } - private List<Path> getSummaryFiles() throws IOException { List<Path> summaryFiles = new ArrayList<Path>(); for (int i = 1; i < currentAttemptId; ++i) { @@ -483,11 +639,22 @@ public class RecoveryParser { return recoveryFiles; } - public RecoveredDAGData parseRecoveryData() throws IOException { + /** + * 1. Read Summary Recovery file and build DAGSummaryData + * Check whether it is recoverable based on the summary file (whether dag is + * in the middle of committing) + * 2. Read the non-Summary Recovery file and build DAGRecoveryData + * Check whether it is recoverable based on both the summary file and non-summary file + * (whether vertex has completed its committing, but its full non-summary recovery events are not seen) + * @return DAGRecoveryData + * @throws IOException + */ + public DAGRecoveryData parseRecoveryData() throws IOException { int dagCounter = 0; Map<TezDAGID, DAGSummaryData> dagSummaryDataMap = new HashMap<TezDAGID, DAGSummaryData>(); List<Path> summaryFiles = getSummaryFiles(); + LOG.debug("SummaryFile size:" + summaryFiles.size()); for (Path summaryFile : summaryFiles) { FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryFile); LOG.info("Parsing summary file" @@ -559,33 +726,24 @@ public class RecoveryParser { LOG.info("Checking if DAG is in recoverable state" + ", dagId=" + lastInProgressDAGData.dagId); - final RecoveredDAGData recoveredDAGData = new RecoveredDAGData(); - if (lastInProgressDAGData.completed) { - recoveredDAGData.isCompleted = true; - recoveredDAGData.dagState = lastInProgressDAGData.dagState; - } - - String nonRecoverableReason = isDAGRecoverable(lastInProgressDAGData); - if (nonRecoverableReason != null) { - LOG.warn("Found last inProgress DAG but not recoverable: " - + lastInProgressDAGData); - recoveredDAGData.nonRecoverable = true; - recoveredDAGData.reason = nonRecoverableReason; - } - + final DAGRecoveryData recoveredDAGData = new DAGRecoveryData(lastInProgressDAGData); List<Path> dagRecoveryFiles = getDAGRecoveryFiles(lastInProgressDAG); boolean skipAllOtherEvents = false; Path lastRecoveryFile = null; + // read the non summary events even when it is nonrecoverable. (Just read the DAGSubmittedEvent + // to create the DAGImpl) for (Path dagRecoveryFile : dagRecoveryFiles) { if (skipAllOtherEvents) { LOG.warn("Other recovery files will be skipped due to error in the previous recovery file" + lastRecoveryFile); break; } + FileStatus fileStatus = recoveryFS.getFileStatus(dagRecoveryFile); lastRecoveryFile = dagRecoveryFile; LOG.info("Trying to recover dag from recovery file" + ", dagId=" + lastInProgressDAG.toString() - + ", dagRecoveryFile=" + dagRecoveryFile); + + ", dagRecoveryFile=" + dagRecoveryFile + + ", len=" + fileStatus.getLen()); FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize); while (true) { HistoryEvent event; @@ -606,14 +764,14 @@ public class RecoveryParser { // hit an error - skip reading other events break; } + HistoryEventType eventType = event.getEventType(); + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); switch (eventType) { case DAG_SUBMITTED: - { DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent) event; - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(submittedEvent.getDAGPlan(), lastInProgressDAG); recoveredDAGData.cumulativeAdditionalResources = submittedEvent @@ -624,195 +782,110 @@ public class RecoveryParser { skipAllOtherEvents = true; } break; - } case DAG_INITIALIZED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - recoveredDAGData.recoveredDAG.restoreFromEvent(event); + recoveredDAGData.dagInitedEvent = (DAGInitializedEvent)event; break; - } case DAG_STARTED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - recoveredDAGData.recoveredDAG.restoreFromEvent(event); + recoveredDAGData.dagStartedEvent= (DAGStartedEvent)event; break; - } + case DAG_FINISHED: + recoveredDAGData.dagFinishedEvent = (DAGFinishedEvent)event; + skipAllOtherEvents = true; + break; case DAG_COMMIT_STARTED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - recoveredDAGData.recoveredDAG.restoreFromEvent(event); - break; - } case VERTEX_GROUP_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_FINISHED: + case CONTAINER_LAUNCHED: { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - recoveredDAGData.recoveredDAG.restoreFromEvent(event); - break; - } - case VERTEX_GROUP_COMMIT_FINISHED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - recoveredDAGData.recoveredDAG.restoreFromEvent(event); + // Nothing to do for now break; } case DAG_KILL_REQUEST: { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - break; - } - case DAG_FINISHED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - // If this is seen, nothing to recover - assert recoveredDAGData.recoveredDAG != null; - recoveredDAGData.recoveredDAG.restoreFromEvent(event); - recoveredDAGData.isCompleted = true; - recoveredDAGData.dagState = - ((DAGFinishedEvent) event).getState(); - skipAllOtherEvents = true; - break; - } - case CONTAINER_LAUNCHED: - { - // Nothing to do for now break; } case VERTEX_INITIALIZED: + { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - VertexInitializedEvent vEvent = (VertexInitializedEvent) event; - Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); - v.restoreFromEvent(vEvent); + VertexInitializedEvent vertexInitEvent = (VertexInitializedEvent)event; + VertexRecoveryData vertexRecoveryData = recoveredDAGData.maybeCreateVertexRecoveryData(vertexInitEvent.getVertexID()); + vertexRecoveryData.vertexInitedEvent = vertexInitEvent; break; } - case VERTEX_STARTED: + case VERTEX_CONFIGURE_DONE: { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - VertexStartedEvent vEvent = (VertexStartedEvent) event; - Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); - v.restoreFromEvent(vEvent); + VertexConfigurationDoneEvent reconfigureDoneEvent = (VertexConfigurationDoneEvent)event; + VertexRecoveryData vertexRecoveryData = recoveredDAGData.maybeCreateVertexRecoveryData(reconfigureDoneEvent.getVertexID()); + vertexRecoveryData.vertexConfigurationDoneEvent = reconfigureDoneEvent; break; } - case VERTEX_PARALLELISM_UPDATED: + case VERTEX_STARTED: { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event; - Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); - v.restoreFromEvent(vEvent); + VertexStartedEvent vertexStartedEvent = (VertexStartedEvent)event; + VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(vertexStartedEvent.getVertexID()); + Preconditions.checkArgument(vertexRecoveryData != null, "No VertexInitializedEvent before VertexStartedEvent"); + vertexRecoveryData.vertexStartedEvent = vertexStartedEvent; break; } case VERTEX_COMMIT_STARTED: { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event; - Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); - v.restoreFromEvent(vEvent); break; } case VERTEX_FINISHED: { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - VertexFinishedEvent vEvent = (VertexFinishedEvent) event; - Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); - v.restoreFromEvent(vEvent); + VertexFinishedEvent vertexFinishedEvent = (VertexFinishedEvent)event; + VertexRecoveryData vertexRecoveryData = recoveredDAGData.maybeCreateVertexRecoveryData(vertexFinishedEvent.getVertexID()); + vertexRecoveryData.vertexFinishedEvent = vertexFinishedEvent; break; } case TASK_STARTED: { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - TaskStartedEvent tEvent = (TaskStartedEvent) event; - Task task = recoveredDAGData.recoveredDAG.getVertex( - tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID()); - task.restoreFromEvent(tEvent); + TaskStartedEvent taskStartedEvent = (TaskStartedEvent) event; + VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getTaskID().getVertexID()); + Preconditions.checkArgument(vertexRecoveryData != null, + "Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getTaskID().getVertexID()); + TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskStartedEvent.getTaskID()); + taskRecoveryData.taskStartedEvent = taskStartedEvent; break; } case TASK_FINISHED: { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - TaskFinishedEvent tEvent = (TaskFinishedEvent) event; - Task task = recoveredDAGData.recoveredDAG.getVertex( - tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID()); - task.restoreFromEvent(tEvent); + TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent) event; + VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getTaskID().getVertexID()); + Preconditions.checkArgument(vertexRecoveryData != null, + "Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getTaskID().getVertexID()); + TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskFinishedEvent.getTaskID()); + taskRecoveryData.taskFinishedEvent = taskFinishedEvent; break; } case TASK_ATTEMPT_STARTED: { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event; - Task task = - recoveredDAGData.recoveredDAG.getVertex( - tEvent.getTaskAttemptID().getTaskID().getVertexID()) - .getTask(tEvent.getTaskAttemptID().getTaskID()); - task.restoreFromEvent(tEvent); + TaskAttemptStartedEvent taStartedEvent = (TaskAttemptStartedEvent)event; + VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get( + taStartedEvent.getTaskAttemptID().getTaskID().getVertexID()); + Preconditions.checkArgument(vertexRecoveryData != null, + "Invalid TaskAttemptStartedEvent, its vertexId does not exist, taId=" + taStartedEvent.getTaskAttemptID()); + TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap + .get(taStartedEvent.getTaskAttemptID().getTaskID()); + Preconditions.checkArgument(taskRecoveryData != null, + "Invalid TaskAttemptStartedEvent, its taskId does not exist, taId=" + taStartedEvent.getTaskAttemptID()); + TaskAttemptRecoveryData taRecoveryData = taskRecoveryData.maybeCreateTaskAttemptRecoveryData(taStartedEvent.getTaskAttemptID()); + taRecoveryData.taStartedEvent = taStartedEvent; break; } case TASK_ATTEMPT_FINISHED: { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event; - Task task = - recoveredDAGData.recoveredDAG.getVertex( - tEvent.getTaskAttemptID().getTaskID().getVertexID()) - .getTask(tEvent.getTaskAttemptID().getTaskID()); - task.restoreFromEvent(tEvent); - break; - } - case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - VertexRecoverableEventsGeneratedEvent vEvent = - (VertexRecoverableEventsGeneratedEvent) event; - Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); - v.restoreFromEvent(vEvent); + TaskAttemptFinishedEvent taFinishedEvent = (TaskAttemptFinishedEvent)event; + VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get( + taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID()); + Preconditions.checkArgument(vertexRecoveryData != null, + "Invalid TaskAttemtFinishedEvent, its vertexId does not exist, taId=" + taFinishedEvent.getTaskAttemptID()); + TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap + .get(taFinishedEvent.getTaskAttemptID().getTaskID()); + Preconditions.checkArgument(taskRecoveryData != null, + "Invalid TaskAttemptFinishedEvent, its taskId does not exist, taId=" + taFinishedEvent.getTaskAttemptID()); + TaskAttemptRecoveryData taRecoveryData = taskRecoveryData.maybeCreateTaskAttemptRecoveryData(taFinishedEvent.getTaskAttemptID()); + taRecoveryData.taFinishedEvent = taFinishedEvent; break; } default: @@ -828,49 +901,185 @@ public class RecoveryParser { } dagRecoveryStream.close(); } + recoveredDAGData.checkRecoverableNonSummary(); + return recoveredDAGData; + } - if (!recoveredDAGData.isCompleted - && !recoveredDAGData.nonRecoverable) { - if (lastInProgressDAGData.bufferedSummaryEvents != null - && !lastInProgressDAGData.bufferedSummaryEvents.isEmpty()) { - for (HistoryEvent bufferedEvent : lastInProgressDAGData.bufferedSummaryEvents) { - assert recoveredDAGData.recoveredDAG != null; - switch (bufferedEvent.getEventType()) { - case VERTEX_GROUP_COMMIT_STARTED: - recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent); - break; - case VERTEX_GROUP_COMMIT_FINISHED: - recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent); - break; - case VERTEX_FINISHED: - VertexFinishedEvent vertexFinishedEvent = - (VertexFinishedEvent) bufferedEvent; - Vertex vertex = recoveredDAGData.recoveredDAG.getVertex( - vertexFinishedEvent.getVertexID()); - if (vertex == null) { - recoveredDAGData.nonRecoverable = true; - recoveredDAGData.reason = "All state could not be recovered" - + ", vertex completed but events not flushed" - + ", vertexId=" + vertexFinishedEvent.getVertexID(); - } else { - vertex.restoreFromEvent(vertexFinishedEvent); - } - break; - case DAG_KILL_REQUEST: - DAGKillRequestEvent killRequestEvent = (DAGKillRequestEvent)bufferedEvent; - recoveredDAGData.isSessionStopped = killRequestEvent.isSessionStopped(); - recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent); - break; - default: - throw new RuntimeException("Invalid data found in buffered summary events" - + ", unknown event type " - + bufferedEvent.getEventType()); - } - } + public static class VertexRecoveryData { + + private VertexInitializedEvent vertexInitedEvent; + private VertexConfigurationDoneEvent vertexConfigurationDoneEvent; + private VertexStartedEvent vertexStartedEvent; + private VertexFinishedEvent vertexFinishedEvent; + private Map<TezTaskID, TaskRecoveryData> taskRecoveryDataMap = + new HashMap<TezTaskID, RecoveryParser.TaskRecoveryData>(); + private boolean commited; + + @VisibleForTesting + public VertexRecoveryData(VertexInitializedEvent vertexInitedEvent, + VertexConfigurationDoneEvent vertexReconfigureDoneEvent, + VertexStartedEvent vertexStartedEvent, + VertexFinishedEvent vertexFinishedEvent, + Map<TezTaskID, TaskRecoveryData> taskRecoveryDataMap, boolean commited) { + super(); + this.vertexInitedEvent = vertexInitedEvent; + this.vertexConfigurationDoneEvent = vertexReconfigureDoneEvent; + this.vertexStartedEvent = vertexStartedEvent; + this.vertexFinishedEvent = vertexFinishedEvent; + this.taskRecoveryDataMap = taskRecoveryDataMap; + this.commited = commited; + } + + public VertexRecoveryData(boolean committed) { + this.commited = committed; + } + + public VertexInitializedEvent getVertexInitedEvent() { + return vertexInitedEvent; + } + + public VertexStartedEvent getVertexStartedEvent() { + return vertexStartedEvent; + } + + public VertexFinishedEvent getVertexFinishedEvent() { + return vertexFinishedEvent; + } + + public VertexConfigurationDoneEvent getVertexConfigurationDoneEvent() { + return vertexConfigurationDoneEvent; + } + + public boolean isReconfigureDone() { + return vertexConfigurationDoneEvent != null; + } + + public boolean isVertexInited() { + return vertexInitedEvent != null; + } + + public boolean shouldSkipInit() { + return vertexInitedEvent != null && vertexConfigurationDoneEvent != null; + } + + public boolean isVertexStarted() { + return vertexStartedEvent != null; + } + + public boolean isVertexSucceeded() { + if (vertexFinishedEvent == null) { + return false; } + return vertexFinishedEvent.getState().equals(VertexState.SUCCEEDED); } - return recoveredDAGData; + public boolean isVertexFinished() { + return vertexFinishedEvent != null; + } + + public boolean isVertexCommitted() { + return this.commited; + } + + public TaskRecoveryData getTaskRecoveryData(TezTaskID taskId) { + return taskRecoveryDataMap.get(taskId); + } + + public TaskRecoveryData maybeCreateTaskRecoveryData(TezTaskID taskId) { + TaskRecoveryData taskRecoveryData = taskRecoveryDataMap.get(taskId); + if (taskRecoveryData == null) { + taskRecoveryData = new TaskRecoveryData(); + taskRecoveryDataMap.put(taskId, taskRecoveryData); + } + return taskRecoveryData; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("VertexInitedEvent=" + vertexInitedEvent); + builder.append(""); + return builder.toString(); + } } + public static class TaskRecoveryData { + + private TaskStartedEvent taskStartedEvent; + private TaskFinishedEvent taskFinishedEvent; + private Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap = + new HashMap<TezTaskAttemptID, RecoveryParser.TaskAttemptRecoveryData>(); + + public TaskRecoveryData() { + + } + + @VisibleForTesting + public TaskRecoveryData(TaskStartedEvent taskStartedEvent, + TaskFinishedEvent taskFinishedEvent, + Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap) { + super(); + this.taskStartedEvent = taskStartedEvent; + this.taskFinishedEvent = taskFinishedEvent; + this.taRecoveryDataMap = taRecoveryDataMap; + } + + public TaskStartedEvent getTaskStartedEvent() { + return taskStartedEvent; + } + + public TaskFinishedEvent getTaskFinishedEvent() { + return taskFinishedEvent; + } + + public boolean isTaskStarted() { + return getTaskStartedEvent() != null; + } + + public boolean isTaskAttemptSucceeded(TezTaskAttemptID taId) { + TaskAttemptRecoveryData taRecoveryData = taRecoveryDataMap.get(taId); + return taRecoveryData == null ? false : taRecoveryData.isTaskAttemptSucceeded(); + } + + public TaskAttemptRecoveryData maybeCreateTaskAttemptRecoveryData(TezTaskAttemptID taId) { + TaskAttemptRecoveryData taRecoveryData = taRecoveryDataMap.get(taId); + if (taRecoveryData == null) { + taRecoveryData = new TaskAttemptRecoveryData(); + taRecoveryDataMap.put(taId, taRecoveryData); + } + return taRecoveryData; + } + } + + public static class TaskAttemptRecoveryData { + + private TaskAttemptStartedEvent taStartedEvent; + private TaskAttemptFinishedEvent taFinishedEvent; + + public TaskAttemptRecoveryData() { + + } + + @VisibleForTesting + public TaskAttemptRecoveryData(TaskAttemptStartedEvent taStartedEvent, + TaskAttemptFinishedEvent taFinishedEvent) { + super(); + this.taStartedEvent = taStartedEvent; + this.taFinishedEvent = taFinishedEvent; + } + + public TaskAttemptStartedEvent getTaskAttemptStartedEvent() { + return taStartedEvent; + } + + public TaskAttemptFinishedEvent getTaskAttemptFinishedEvent() { + return taFinishedEvent; + } + + public boolean isTaskAttemptSucceeded() { + TaskAttemptFinishedEvent taFinishedEvent = getTaskAttemptFinishedEvent(); + return taFinishedEvent == null ? + false : taFinishedEvent.getState() == TaskAttemptState.SUCCEEDED; + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index 924222a..92bf3c4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -36,7 +36,9 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; +import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.EventType; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.slf4j.Logger; @@ -57,11 +59,14 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.dag.api.TaskHeartbeatRequest; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.rm.container.AMContainerTask; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.impl.TezEvent; @@ -227,12 +232,14 @@ public class TaskCommunicatorManager extends AbstractService implements } long currTime = context.getClock().getTime(); - List<TezEvent> otherEvents = new ArrayList<TezEvent>(); - // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events - // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT) - // to VertexImpl to ensure the events ordering - // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent - // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent + // taFinishedEvents - means the TaskAttemptFinishedEvent + // taGeneratedEvents - for recovery, means the events generated by this task attempt and is needed by its downstream vertices + // eventsForVertex - including all the taGeneratedEvents and other events such as INPUT_READ_ERROR_EVENT/INPUT_FAILED_EVENT + // taGeneratedEvents is routed both to TaskAttempt & Vertex. Route to Vertex is for performance consideration + // taFinishedEvents must be routed before taGeneratedEvents + List<TezEvent> taFinishedEvents = new ArrayList<TezEvent>(); + List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>(); + List<TezEvent> eventsForVertex = new ArrayList<TezEvent>(); TaskAttemptEventStatusUpdate taskAttemptEvent = null; boolean readErrorReported = false; for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { @@ -244,21 +251,74 @@ public class TaskCommunicatorManager extends AbstractService implements // send TA_STATUS_UPDATE before TA_DONE/TA_FAILED/TA_KILLED otherwise Status may be missed taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, (TaskStatusUpdateEvent) tezEvent.getEvent()); + } else if (eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT + || eventType == EventType.TASK_ATTEMPT_FAILED_EVENT) { + taFinishedEvents.add(tezEvent); } else { if (eventType == EventType.INPUT_READ_ERROR_EVENT) { readErrorReported = true; } - otherEvents.add(tezEvent); + if (eventType == EventType.DATA_MOVEMENT_EVENT + || eventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT + || eventType == EventType.ROOT_INPUT_INITIALIZER_EVENT + || eventType == EventType.VERTEX_MANAGER_EVENT) { + taGeneratedEvents.add(tezEvent); + } + eventsForVertex.add(tezEvent); } } if (taskAttemptEvent != null) { taskAttemptEvent.setReadErrorReported(readErrorReported); context.getEventHandler().handle(taskAttemptEvent); } - if(!otherEvents.isEmpty()) { + // route taGeneratedEvents to TaskAttempt + if (!taGeneratedEvents.isEmpty()) { + context.getEventHandler().handle(new TaskAttemptEventTezEventUpdate(taskAttemptID, taGeneratedEvents)); + } + // route events to TaskAttempt + Preconditions.checkArgument(taFinishedEvents.size() <= 1, "Multiple TaskAttemptFinishedEvent"); + for (TezEvent e : taFinishedEvents) { + EventMetaData sourceMeta = e.getSourceInfo(); + switch (e.getEventType()) { + case TASK_ATTEMPT_FAILED_EVENT: + TaskAttemptTerminationCause errCause = null; + switch (sourceMeta.getEventGenerator()) { + case INPUT: + errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR; + break; + case PROCESSOR: + errCause = TaskAttemptTerminationCause.APPLICATION_ERROR; + break; + case OUTPUT: + errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR; + break; + case SYSTEM: + errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR; + break; + default: + throw new TezUncheckedException("Unknown EventProducerConsumerType: " + + sourceMeta.getEventGenerator()); + } + TaskAttemptFailedEvent taskFailedEvent =(TaskAttemptFailedEvent) e.getEvent(); + context.getEventHandler().handle( + new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(), + TaskAttemptEventType.TA_FAILED, + "Error: " + taskFailedEvent.getDiagnostics(), + errCause)); + break; + case TASK_ATTEMPT_COMPLETED_EVENT: + context.getEventHandler().handle( + new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE)); + break; + default: + throw new TezUncheckedException("Unhandled tez event type: " + + e.getEventType()); + } + } + if (!eventsForVertex.isEmpty()) { TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID(); context.getEventHandler().handle( - new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents))); + new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex))); } taskHeartbeatHandler.pinged(taskAttemptID); eventInfo = context @@ -269,6 +329,7 @@ public class TaskCommunicatorManager extends AbstractService implements } return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId()); } + public void taskAlive(TezTaskAttemptID taskAttemptId) { taskHeartbeatHandler.pinged(taskAttemptId); } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index 640359d..a01c623 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -34,7 +34,6 @@ import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatusBuilder; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.common.security.ACLManager; -import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; @@ -90,8 +89,6 @@ public interface DAG { UserGroupInformation getDagUGI(); - DAGState restoreFromEvent(HistoryEvent historyEvent); - ACLManager getACLManager(); Map<String, TezVertexID> getVertexNameIDMapping(); http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java index a011b61..04f0e5b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java @@ -26,7 +26,6 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.oldrecords.TaskReport; import org.apache.tez.dag.api.oldrecords.TaskState; -import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -65,8 +64,6 @@ public interface Task { public List<String> getDiagnostics(); - TaskState restoreFromEvent(HistoryEvent historyEvent); - public void registerTezEvent(TezEvent tezEvent); public TaskSpec getBaseTaskSpec(); http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java index cbe72c1..ba09bd9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java @@ -28,7 +28,6 @@ import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.oldrecords.TaskAttemptReport; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; -import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -136,7 +135,5 @@ public interface TaskAttempt { * yet, returns 0. */ long getFinishTime(); - - TaskAttemptState restoreFromEvent(HistoryEvent event); } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 60f5a8f..9fc73a2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -49,7 +49,6 @@ import org.apache.tez.dag.app.TaskAttemptEventInfo; import org.apache.tez.dag.app.dag.event.SpeculatorEvent; import org.apache.tez.dag.app.dag.impl.AMUserCodeException; import org.apache.tez.dag.app.dag.impl.Edge; -import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; @@ -173,8 +172,6 @@ public interface Vertex extends Comparable<Vertex> { // internal apis AppContext getAppContext(); - VertexState restoreFromEvent(HistoryEvent event); - String getLogIdentifier(); public void incrementFailedTaskAttemptCount(); http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java index 1b7ac0f..ba9d1af 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java @@ -27,6 +27,5 @@ public enum VertexState { KILLED, ERROR, TERMINATING, - RECOVERING, COMMITTING, } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java index 45e44f3..8e1edf0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java @@ -18,37 +18,34 @@ package org.apache.tez.dag.app.dag.event; -import java.net.URL; -import java.util.List; - +import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.records.TezDAGID; public class DAGEventRecoverEvent extends DAGEvent { private final DAGState desiredState; - private final List<URL> additionalUrlsForClasspath; + private final DAGRecoveryData recoveredDagData; - public DAGEventRecoverEvent(TezDAGID dagId, DAGState desiredState, - List<URL> additionalUrlsForClasspath) { + public DAGEventRecoverEvent(TezDAGID dagId, DAGState desiredState, DAGRecoveryData recoveredDagData) { super(dagId, DAGEventType.DAG_RECOVER); this.desiredState = desiredState; - this.additionalUrlsForClasspath = additionalUrlsForClasspath; + this.recoveredDagData = recoveredDagData; } - public DAGEventRecoverEvent(TezDAGID dagId, List<URL> additionalUrlsForClasspath) { - this(dagId, null, additionalUrlsForClasspath); + public DAGEventRecoverEvent(TezDAGID dagId, DAGRecoveryData recoveredDagData) { + this(dagId, null, recoveredDagData); } public DAGState getDesiredState() { return desiredState; } - public List<URL> getAdditionalUrlsForClasspath() { - return this.additionalUrlsForClasspath; - } - public boolean hasDesiredState() { return this.desiredState != null; } + + public DAGRecoveryData getRecoveredDagData() { + return recoveredDagData; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/RecoveryEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/RecoveryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/RecoveryEvent.java new file mode 100644 index 0000000..cad3824 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/RecoveryEvent.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.app.dag.event; + +public interface RecoveryEvent { + + public boolean isFromRecovery(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java index 7ec8921..21c6b14 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java @@ -22,10 +22,11 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent - implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent { + implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent, RecoveryEvent { private final String diagnostics; private final TaskAttemptTerminationCause errorCause; + private boolean isFromRecovery = false; /* Accepted Types - FAILED, TIMED_OUT */ public TaskAttemptEventAttemptFailed(TezTaskAttemptID id, @@ -35,6 +36,14 @@ public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent this.errorCause = errorCause; } + /* Accepted Types - FAILED, TIMED_OUT */ + public TaskAttemptEventAttemptFailed(TezTaskAttemptID id, + TaskAttemptEventType type, String diagnostics, TaskAttemptTerminationCause errorCause, + boolean isFromRecovery) { + this(id, type, diagnostics, errorCause); + this.isFromRecovery = isFromRecovery; + } + @Override public String getDiagnosticInfo() { return diagnostics; @@ -45,4 +54,8 @@ public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent return errorCause; } + @Override + public boolean isFromRecovery() { + return isFromRecovery; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java index 72e6b07..4642443 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java @@ -22,10 +22,12 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; public class TaskAttemptEventAttemptKilled extends TaskAttemptEvent - implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent { + implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent, RecoveryEvent { private final String diagnostics; private final TaskAttemptTerminationCause errorCause; + private boolean fromRecovery; + public TaskAttemptEventAttemptKilled(TezTaskAttemptID id, String diagnostics, TaskAttemptTerminationCause errorCause) { @@ -34,6 +36,14 @@ public class TaskAttemptEventAttemptKilled extends TaskAttemptEvent this.errorCause = errorCause; } + public TaskAttemptEventAttemptKilled(TezTaskAttemptID id, + String diagnostics, + TaskAttemptTerminationCause errorCause, + boolean fromRecovery) { + this(id, diagnostics, errorCause); + this.fromRecovery = fromRecovery; + } + @Override public String getDiagnosticInfo() { return diagnostics; @@ -44,4 +54,8 @@ public class TaskAttemptEventAttemptKilled extends TaskAttemptEvent return errorCause; } + @Override + public boolean isFromRecovery() { + return fromRecovery; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java index a0dfe5d..96cf0e6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java @@ -21,10 +21,11 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; public class TaskAttemptEventKillRequest extends TaskAttemptEvent - implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent { + implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent, RecoveryEvent { private final String message; private final TaskAttemptTerminationCause errorCause; + private boolean fromRecovery = false; public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message, TaskAttemptTerminationCause err) { super(id, TaskAttemptEventType.TA_KILL_REQUEST); @@ -32,6 +33,12 @@ public class TaskAttemptEventKillRequest extends TaskAttemptEvent this.errorCause = err; } + public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message, TaskAttemptTerminationCause err, + boolean fromRecovery) { + this(id, message, err); + this.fromRecovery = fromRecovery; + } + @Override public String getDiagnosticInfo() { return message; @@ -42,4 +49,9 @@ public class TaskAttemptEventKillRequest extends TaskAttemptEvent return errorCause; } + @Override + public boolean isFromRecovery() { + return fromRecovery; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java index 825a143..e700c6c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java @@ -24,11 +24,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.dag.records.TezTaskAttemptID; -public class TaskAttemptEventStartedRemotely extends TaskAttemptEvent { +public class TaskAttemptEventStartedRemotely extends TaskAttemptEvent implements RecoveryEvent { private final ContainerId containerId; // TODO Can appAcls be handled elsewhere ? private final Map<ApplicationAccessType, String> applicationACLs; + private boolean fromRecovery = false; public TaskAttemptEventStartedRemotely(TezTaskAttemptID id, ContainerId containerId, Map<ApplicationAccessType, String> appAcls) { @@ -37,6 +38,12 @@ public class TaskAttemptEventStartedRemotely extends TaskAttemptEvent { this.applicationACLs = appAcls; } + public TaskAttemptEventStartedRemotely(TezTaskAttemptID id, ContainerId containerId, + Map<ApplicationAccessType, String> appAcls, boolean fromRecovery) { + this(id, containerId, appAcls); + this.fromRecovery = fromRecovery; + } + public ContainerId getContainerId() { return containerId; } @@ -45,4 +52,9 @@ public class TaskAttemptEventStartedRemotely extends TaskAttemptEvent { return applicationACLs; } + @Override + public boolean isFromRecovery() { + return fromRecovery; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTezEventUpdate.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTezEventUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTezEventUpdate.java new file mode 100644 index 0000000..bef9248 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTezEventUpdate.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.app.dag.event; + +import java.util.List; + +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TezEvent; + +public class TaskAttemptEventTezEventUpdate extends TaskAttemptEvent { + + private List<TezEvent> tezEvents; + + public TaskAttemptEventTezEventUpdate(TezTaskAttemptID taId, List<TezEvent> tezEvents) { + super(taId, TaskAttemptEventType.TA_TEZ_EVENT_UPDATE); + this.tezEvents = tezEvents; + } + + public List<TezEvent> getTezEvents() { + return tezEvents; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java index 6ba69e3..dacb0c2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java @@ -29,6 +29,7 @@ public enum TaskAttemptEventType { //Producer: TaskAttemptListener | Vertex after routing events TA_STARTED_REMOTELY, TA_STATUS_UPDATE, + TA_TEZ_EVENT_UPDATE, // for recovery TA_DONE, TA_FAILED, TA_KILLED, // Generated by TaskCommunicators @@ -55,8 +56,5 @@ public enum TaskAttemptEventType { // Producer: consumer destination vertex TA_OUTPUT_FAILED, - - // Recovery - TA_RECOVER, }
