TEZ-2581. Umbrella for Tez Recovery Redesign (zjffdu)

Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/28f30b0e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/28f30b0e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/28f30b0e

Branch: refs/heads/master
Commit: 28f30b0ef654f124713acb7a213e37bbc7d8b486
Parents: c4487f9
Author: Jeff Zhang <[email protected]>
Authored: Wed Nov 25 22:01:44 2015 +0800
Committer: Jeff Zhang <[email protected]>
Committed: Wed Nov 25 22:01:44 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 .../apache/tez/dag/api/TezConfiguration.java    |    8 +
 .../apache/tez/dag/api/event/VertexState.java   |    7 +-
 .../records/TaskAttemptTerminationCause.java    |    1 +
 .../tez/dag/api/client/VertexStatusBuilder.java |    2 -
 .../java/org/apache/tez/dag/app/AppContext.java |    4 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   38 +-
 .../org/apache/tez/dag/app/RecoveryParser.java  |  721 ++++++---
 .../tez/dag/app/TaskCommunicatorManager.java    |   79 +-
 .../java/org/apache/tez/dag/app/dag/DAG.java    |    3 -
 .../java/org/apache/tez/dag/app/dag/Task.java   |    3 -
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |    3 -
 .../java/org/apache/tez/dag/app/dag/Vertex.java |    3 -
 .../org/apache/tez/dag/app/dag/VertexState.java |    1 -
 .../dag/app/dag/event/DAGEventRecoverEvent.java |   23 +-
 .../tez/dag/app/dag/event/RecoveryEvent.java    |   23 +
 .../event/TaskAttemptEventAttemptFailed.java    |   15 +-
 .../event/TaskAttemptEventAttemptKilled.java    |   16 +-
 .../dag/event/TaskAttemptEventKillRequest.java  |   14 +-
 .../event/TaskAttemptEventStartedRemotely.java  |   14 +-
 .../event/TaskAttemptEventTezEventUpdate.java   |   37 +
 .../dag/app/dag/event/TaskAttemptEventType.java |    4 +-
 .../dag/app/dag/event/TaskEventRecoverTask.java |   53 -
 .../app/dag/event/TaskEventScheduleTask.java    |   14 +-
 .../dag/app/dag/event/TaskEventTermination.java |   16 +-
 .../tez/dag/app/dag/event/TaskEventType.java    |    3 -
 .../app/dag/event/VertexEventRecoverVertex.java |    1 -
 .../app/dag/event/VertexEventRouteEvent.java    |   12 -
 .../event/VertexEventSourceVertexRecovered.java |   62 -
 .../tez/dag/app/dag/event/VertexEventType.java  |    3 -
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  364 ++---
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  273 ++--
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  384 ++---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 1426 ++++------------
 .../tez/dag/history/HistoryEventHandler.java    |    5 +-
 .../tez/dag/history/HistoryEventType.java       |    3 +-
 .../tez/dag/history/RecoveryConverters.java     |   27 +
 .../events/TaskAttemptFinishedEvent.java        |   26 +-
 .../events/VertexConfigurationDoneEvent.java    |  211 +++
 .../events/VertexGroupCommitFinishedEvent.java  |   26 +-
 .../events/VertexGroupCommitStartedEvent.java   |   25 +-
 .../history/events/VertexInitializedEvent.java  |   35 +-
 .../events/VertexParallelismUpdatedEvent.java   |  204 ---
 .../VertexRecoverableEventsGeneratedEvent.java  |  224 ---
 .../impl/HistoryEventJsonConversion.java        |   81 +-
 .../dag/history/recovery/RecoveryService.java   |   10 +-
 .../tez/dag/history/utils/TezEventUtils.java    |  131 ++
 tez-dag/src/main/proto/HistoryEvents.proto      |   31 +-
 .../dag/api/client/TestVertexStatusBuilder.java |    7 +-
 .../apache/tez/dag/app/TestRecoveryParser.java  |  480 +++++-
 .../dag/app/TestTaskCommunicatorManager1.java   |   56 +-
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   | 1527 ++++++++++++------
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |    5 +-
 .../app/dag/impl/TestTaskAttemptRecovery.java   |  327 ----
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |    5 +-
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  |  873 ----------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   33 +-
 .../dag/app/dag/impl/TestVertexRecovery.java    | 1340 ---------------
 .../TestHistoryEventsProtoConversion.java       |  220 ++-
 .../impl/TestHistoryEventJsonConversion.java    |   27 +-
 .../org/apache/tez/examples/TezExampleBase.java |   12 +
 .../ats/HistoryEventTimelineConversion.java     |   18 +-
 .../ats/TestHistoryEventTimelineConversion.java |   29 +-
 .../apache/tez/runtime/api/impl/TezEvent.java   |    2 +-
 .../apache/tez/test/AMShutdownController.java   |   57 +
 .../RecoveryServiceWithEventHandlingHook.java   |  386 +++++
 .../org/apache/tez/test/TestDAGRecovery.java    |   62 -
 .../java/org/apache/tez/test/TestRecovery.java  |  484 ++++++
 .../java/org/apache/tez/test/TestTezJobs.java   |    6 +-
 .../apache/tez/test/dag/MultiAttemptDAG.java    |    2 +-
 70 files changed, 4639 insertions(+), 5989 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59847ef..b9a91f8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES:
+  TEZ-2581. Umbrella for Tez Recovery Redesign
   TEZ-2956. Handle auto-reduce parallelism when the
   totalNumBipartiteSourceTasks is 0
   TEZ-2947. Tez UI: Timeline, RM & AM requests gets into a consecutive loop in 
counters page without any delay

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 0ea8999..fabc256 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1493,4 +1493,12 @@ public class TezConfiguration extends Configuration {
   @ConfigurationProperty(type="boolean")
   public static final String TEZ_CLIENT_ASYNCHRONOUS_STOP = TEZ_PREFIX + 
"client.asynchronous-stop";
   public static final boolean TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT = true;
+
+  // for Recovery Test
+  @Private
+  @ConfigurationScope(Scope.TEST)
+  public static final String TEZ_AM_RECOVERY_SERVICE_CLASS =
+      TEZ_PREFIX + "test.recovery-service-class";
+  @Private
+  public static final String TEZ_AM_RECOVERY_SERVICE_CLASS_DEFAULT = 
"org.apache.tez.dag.history.recovery.RecoveryService";
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
index c9c2d58..86e70a1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
@@ -56,5 +56,10 @@ public enum VertexState {
    * further. Listeners can depend on the vertex's configured state after
    * receiving this notification.
    */
-  CONFIGURED
+  CONFIGURED,
+
+  /**
+   * Indicates that the Vertex move to INITIALIZING
+   */
+  INITIALIZING
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
----------------------------------------------------------------------
diff --git 
a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
 
b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
index a5214fb..14eaa3a 100644
--- 
a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
+++ 
b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
@@ -23,6 +23,7 @@ public enum TaskAttemptTerminationCause {
   
   TERMINATED_BY_CLIENT, // Killed by client command
   TERMINATED_AT_SHUTDOWN, // Killed due execution shutdown
+  TERMINATED_AT_RECOVERY, // Killed in recovery, due to can not recover 
running task attempt
   INTERNAL_PREEMPTION, // Killed by Tez to makes space for higher pri work
   EXTERNAL_PREEMPTION, // Killed by the cluster to make space for other work
   TERMINATED_INEFFECTIVE_SPECULATION, // Killed speculative attempt because 
original succeeded

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java 
b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
index ada3490..4de321c 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
@@ -65,8 +65,6 @@ public class VertexStatusBuilder extends VertexStatus {
         return VertexStatusStateProto.VERTEX_NEW;
       case INITIALIZING:
         return VertexStatusStateProto.VERTEX_INITIALIZING;
-      case RECOVERING:
-        return VertexStatusStateProto.VERTEX_NEW;
       case INITED:
         return VertexStatusStateProto.VERTEX_INITED;
       case RUNNING:

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 68453b1..30716da 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -30,6 +30,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.TaskSchedulerManager;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
@@ -76,6 +77,8 @@ public interface AppContext {
 
   void setDAG(DAG dag);
 
+  void setDAGRecoveryData(DAGRecoveryData dagRecoveryData);
+
   Set<String> getAllDAGIDs();
 
   @SuppressWarnings("rawtypes")
@@ -126,4 +129,5 @@ public interface AppContext {
 
   public HadoopShim getHadoopShim();
 
+  public DAGRecoveryData getDAGRecoveryData();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 2c50264..23981e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -127,7 +127,7 @@ import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
-import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData;
+import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.Task;
@@ -151,6 +151,10 @@ import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
 import org.apache.tez.dag.app.launcher.ContainerLauncherManager;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
+import org.apache.tez.dag.app.dag.impl.TaskImpl;
+import org.apache.tez.dag.app.dag.impl.VertexImpl;
+import org.apache.tez.dag.app.launcher.LocalContainerLauncher;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
 import org.apache.tez.dag.app.rm.ContainerLauncherEventType;
 import org.apache.tez.dag.app.rm.TaskSchedulerManager;
@@ -1417,6 +1421,7 @@ public class DAGAppMaster extends AbstractService {
   private class RunningAppContext implements AppContext {
 
     private DAG dag;
+    private DAGRecoveryData dagRecoveryData;
     private final Configuration conf;
     private final ClusterInfo clusterInfo = new ClusterInfo();
     private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -1633,6 +1638,7 @@ public class DAGAppMaster extends AbstractService {
       try {
         wLock.lock();
         this.dag = dag;
+        this.dagRecoveryData = null;
       } finally {
         wLock.unlock();
       }
@@ -1647,6 +1653,16 @@ public class DAGAppMaster extends AbstractService {
     public long getCumulativeGCTime() {
       return getAMGCTime();
     }
+
+    @Override
+    public void setDAGRecoveryData(DAGRecoveryData dagRecoveryData) {
+      this.dagRecoveryData = dagRecoveryData;
+    }
+
+    @Override
+    public DAGRecoveryData getDAGRecoveryData() {
+      return dagRecoveryData;
+    }
   }
 
   private static class ServiceWithDependency implements 
ServiceStateChangeListener {
@@ -1818,7 +1834,7 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  private RecoveredDAGData recoverDAG() throws IOException, TezException {
+  private DAGRecoveryData recoverDAG() throws IOException, TezException {
     if (recoveryEnabled) {
       if (this.appAttemptID.getAttemptId() > 1) {
         LOG.info("Recovering data from previous attempts"
@@ -1826,7 +1842,7 @@ public class DAGAppMaster extends AbstractService {
         this.state = DAGAppMasterState.RECOVERING;
         RecoveryParser recoveryParser = new RecoveryParser(
             this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
-        RecoveredDAGData recoveredDAGData = recoveryParser.parseRecoveryData();
+        DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData();
         return recoveredDAGData;
       }
     }
@@ -1855,7 +1871,7 @@ public class DAGAppMaster extends AbstractService {
 
     this.lastDAGCompletionTime = clock.getTime();
 
-    RecoveredDAGData recoveredDAGData;
+    DAGRecoveryData recoveredDAGData;
     try {
       recoveredDAGData = recoverDAG();
     } catch (IOException e) {
@@ -1875,9 +1891,8 @@ public class DAGAppMaster extends AbstractService {
     }
 
     if (recoveredDAGData != null) {
-      List<URL> classpathUrls = null;
       if (recoveredDAGData.cumulativeAdditionalResources != null) {
-        classpathUrls = 
processAdditionalResources(recoveredDAGData.cumulativeAdditionalResources);
+        recoveredDAGData.additionalUrlsForClasspath = 
processAdditionalResources(recoveredDAGData.cumulativeAdditionalResources);
         amResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
         
cumulativeAdditionalResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
       }
@@ -1900,9 +1915,11 @@ public class DAGAppMaster extends AbstractService {
             + ", failureReason=" + recoveredDAGData.reason);
         _updateLoggers(recoveredDAGData.recoveredDAG, "");
         if (recoveredDAGData.nonRecoverable) {
+          addDiagnostic("DAG " + recoveredDAGData.recoveredDagID + " can not 
be recovered due to "
+              + recoveredDAGData.reason);
           DAGEventRecoverEvent recoverDAGEvent =
               new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
-                  DAGState.FAILED, classpathUrls);
+                  DAGState.FAILED, recoveredDAGData);
           DAGRecoveredEvent dagRecoveredEvent = new 
DAGRecoveredEvent(this.appAttemptID,
               recoveredDAGData.recoveredDAG.getID(), 
recoveredDAGData.recoveredDAG.getName(),
               recoveredDAGData.recoveredDAG.getUserName(),
@@ -1919,7 +1936,7 @@ public class DAGAppMaster extends AbstractService {
         } else {
           DAGEventRecoverEvent recoverDAGEvent =
               new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
-                  recoveredDAGData.dagState, classpathUrls);
+                  recoveredDAGData.dagState, recoveredDAGData);
           DAGRecoveredEvent dagRecoveredEvent = new 
DAGRecoveredEvent(this.appAttemptID,
               recoveredDAGData.recoveredDAG.getID(), 
recoveredDAGData.recoveredDAG.getName(),
               recoveredDAGData.recoveredDAG.getUserName(), 
this.clock.getTime(),
@@ -1938,7 +1955,7 @@ public class DAGAppMaster extends AbstractService {
         this.historyEventHandler.handle(new 
DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(),
             dagRecoveredEvent));
         DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent(
-            recoveredDAGData.recoveredDAG.getID(), classpathUrls);
+            recoveredDAGData.recoveredDAG.getID(), recoveredDAGData);
         dagEventDispatcher.handle(recoverDAGEvent);
         this.state = DAGAppMasterState.RUNNING;
       }
@@ -2050,7 +2067,6 @@ public class DAGAppMaster extends AbstractService {
       if (dag == null || eventDagIndex != dag.getID().getId()) {
         return; // event not relevant any more
       }
-      
       Task task =
           dag.getVertex(event.getTaskID().getVertexID()).
               getTask(event.getTaskID());
@@ -2432,7 +2448,6 @@ public class DAGAppMaster extends AbstractService {
         TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE_DEFAULT);
   }
 
-
   @VisibleForTesting
   static void parseAllPlugins(
       List<NamedEntityDescriptor> taskSchedulerDescriptors, BiMap<String, 
Integer> taskSchedulerPluginMap,
@@ -2547,4 +2562,5 @@ public class DAGAppMaster extends AbstractService {
     }
     return sb.toString();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 046dbd9..368dd17 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -33,13 +34,14 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.dag.DAGState;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -58,21 +60,29 @@ import 
org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.recovery.RecoveryService;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.runtime.api.impl.TezEvent;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
+
+/**
+ * RecoverParser is mainly for Tez AM Recovery. It would read the recovery 
events. (summary & non-summary)
+ * 
+ */
 public class RecoveryParser {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RecoveryParser.class);
@@ -100,7 +110,8 @@ public class RecoveryParser {
     this.recoveryFS.mkdirs(currentAttemptRecoveryDataDir);
   }
 
-  public static class RecoveredDAGData {
+  public static class DAGRecoveryData {
+
     public TezDAGID recoveredDagID = null;
     public DAGImpl recoveredDAG = null;
     public DAGState dagState = null;
@@ -109,6 +120,119 @@ public class RecoveryParser {
     public boolean isSessionStopped = false;
     public String reason = null;
     public Map<String, LocalResource> cumulativeAdditionalResources = null;
+    public List<URL> additionalUrlsForClasspath = null;
+
+    public Map<TezVertexID, VertexRecoveryData> vertexRecoveryDataMap =
+        new HashMap<TezVertexID, RecoveryParser.VertexRecoveryData>();
+    private DAGInitializedEvent dagInitedEvent;
+    private DAGStartedEvent dagStartedEvent;
+    private DAGFinishedEvent dagFinishedEvent;
+
+    private Map<TezVertexID, Boolean> vertexCommitStatus =
+        new HashMap<TezVertexID, Boolean>();
+    private Map<String, Boolean> vertexGroupCommitStatus =
+        new HashMap<String, Boolean>();
+    private Map<TezVertexID, Boolean> vertexGroupMemberCommitStatus =
+        new HashMap<TezVertexID, Boolean>();
+
+    public DAGRecoveryData(DAGSummaryData dagSummaryData) {
+      if (dagSummaryData.completed) {
+        this.isCompleted = true;
+        this.dagState = dagSummaryData.dagState;
+      }
+      dagSummaryData.checkRecoverableSummary();
+      this.nonRecoverable = dagSummaryData.nonRecoverable;
+      this.reason = dagSummaryData.reason;
+      this.vertexCommitStatus = dagSummaryData.vertexCommitStatus;
+      this.vertexGroupCommitStatus = dagSummaryData.vertexGroupCommitStatus;
+      this.vertexGroupMemberCommitStatus = 
dagSummaryData.vertexGroupMemberCommitStatus;
+    }
+
+    // DAG is not recoverable if vertex has committer and has completed the 
commit (based on summary recovery events)
+    // but its full recovery events are not seen. (based on non-summary 
recovery events)
+    // Unrecoverable reason: vertex is committed we cannot rerun it and if 
vertex recovery events are not completed 
+    // we cannot run other vertices that may depend on this one. So we have to 
abort.
+    public void checkRecoverableNonSummary() {
+      // It is OK without full recovering events if the dag is completed based 
on summary event.
+      if (isCompleted) {
+        return;
+      }
+      for (Map.Entry<TezVertexID, Boolean> entry : 
vertexCommitStatus.entrySet()) {
+        // vertex has finished committing
+        TezVertexID vertexId = entry.getKey();
+        boolean commitFinished = entry.getValue();
+        if(commitFinished
+            && (!vertexRecoveryDataMap.containsKey(vertexId)
+            || vertexRecoveryDataMap.get(vertexId).getVertexFinishedEvent() == 
null)) {
+          this.nonRecoverable = true;
+          this.reason = "Vertex has been committed, but its full recovery 
events are not seen, vertexId="
+              + vertexId;
+          return;
+        }
+      }
+      for (Map.Entry<TezVertexID, Boolean> entry : 
vertexGroupMemberCommitStatus.entrySet()) {
+        // vertex has finished committing
+        TezVertexID vertexId = entry.getKey();
+        boolean commitFinished = entry.getValue();
+        if(commitFinished
+            && (!vertexRecoveryDataMap.containsKey(vertexId)
+            || vertexRecoveryDataMap.get(vertexId).getVertexFinishedEvent() == 
null)) {
+          this.nonRecoverable = true;
+          this.reason = "Vertex has been committed as member of vertex group"
+              + ", but its full recovery events are not seen, vertexId=" + 
vertexId;
+          return;
+        }
+      }
+    }
+
+    public DAGInitializedEvent getDAGInitializedEvent() {
+      return dagInitedEvent;
+    }
+
+    public DAGStartedEvent getDAGStartedEvent() {
+      return dagStartedEvent;
+    }
+
+    public DAGFinishedEvent getDAGFinishedEvent() {
+      return dagFinishedEvent;
+    }
+
+    public boolean isVertexGroupCommitted(String groupName) {
+      return vertexGroupCommitStatus.containsKey(groupName)
+          && vertexGroupCommitStatus.get(groupName);
+    }
+
+    public VertexRecoveryData getVertexRecoveryData(TezVertexID vertexId) {
+      return vertexRecoveryDataMap.get(vertexId);
+    }
+
+    public TaskRecoveryData getTaskRecoveryData(TezTaskID taskId) {
+      VertexRecoveryData vertexRecoveryData = 
getVertexRecoveryData(taskId.getVertexID());
+      if (vertexRecoveryData != null) {
+        return vertexRecoveryData.taskRecoveryDataMap.get(taskId);
+      } else {
+        return null;
+      }
+    }
+
+    public TaskAttemptRecoveryData getTaskAttemptRecoveryData(TezTaskAttemptID 
taId) {
+      TaskRecoveryData taskRecoveryData = 
getTaskRecoveryData(taId.getTaskID());
+      if (taskRecoveryData != null) {
+        return taskRecoveryData.taRecoveryDataMap.get(taId);
+      } else {
+        return null;
+      }
+    }
+
+    public VertexRecoveryData maybeCreateVertexRecoveryData(TezVertexID 
vertexId) {
+      VertexRecoveryData vRecoveryData = vertexRecoveryDataMap.get(vertexId);
+      if (vRecoveryData == null) {
+        vRecoveryData = new 
VertexRecoveryData(vertexCommitStatus.containsKey(vertexId)
+            ? vertexCommitStatus.get(vertexId) : false);
+        vertexRecoveryDataMap.put(vertexId, vRecoveryData);
+      }
+      return vRecoveryData;
+    }
   }
 
   private static void parseSummaryFile(FSDataInputStream inputStream)
@@ -178,12 +302,12 @@ public class RecoveryParser {
       case VERTEX_INITIALIZED:
         event = new VertexInitializedEvent();
         break;
+      case VERTEX_CONFIGURE_DONE:
+        event = new VertexConfigurationDoneEvent();
+        break;
       case VERTEX_STARTED:
         event = new VertexStartedEvent();
         break;
-      case VERTEX_PARALLELISM_UPDATED:
-        event = new VertexParallelismUpdatedEvent();
-        break;
       case VERTEX_COMMIT_STARTED:
         event = new VertexCommitStartedEvent();
         break;
@@ -208,18 +332,11 @@ public class RecoveryParser {
       case TASK_ATTEMPT_FINISHED:
         event = new TaskAttemptFinishedEvent();
         break;
-      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-        event = new VertexRecoverableEventsGeneratedEvent();
-        break;
       default:
         throw new IOException("Invalid data found, unknown event type "
             + eventType);
 
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Parsing event from input stream"
-          + ", eventType=" + eventType);
-    }
     try {
       event.fromProtoStream(inputStream);
     } catch (EOFException eof) {
@@ -233,10 +350,6 @@ public class RecoveryParser {
     return event;
   }
 
-
-
-
-
   public static List<HistoryEvent> parseDAGRecoveryFile(FSDataInputStream 
inputStream)
       throws IOException {
     List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
@@ -246,11 +359,45 @@ public class RecoveryParser {
         LOG.info("Reached end of stream");
         break;
       }
+      LOG.debug("Read HistoryEvent, eventType=" + historyEvent.getEventType() 
+ ", event=" + historyEvent);
       historyEvents.add(historyEvent);
     }
     return historyEvents;
   }
 
+  public static List<HistoryEvent> readRecoveryEvents(TezConfiguration 
tezConf, ApplicationId appId,
+      int attempt) throws IOException {
+    Path tezSystemStagingDir =
+        TezCommonUtils.getTezSystemStagingPath(tezConf, appId.toString());
+    Path recoveryDataDir =
+        TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf);
+    FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf);
+    List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
+    for (int i=1; i <= attempt; ++i) {
+      Path currentAttemptRecoveryDataDir =
+          TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i);
+      Path recoveryFilePath =
+          new Path(currentAttemptRecoveryDataDir, appId.toString().replace(
+              "application", "dag")
+              + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+      if (fs.exists(recoveryFilePath)) {
+        LOG.info("Read recovery file:" + recoveryFilePath);
+        FSDataInputStream in = null;
+        try {
+          in = fs.open(recoveryFilePath);
+          historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(in));
+        } catch (IOException e) {
+          throw e;
+        } finally {
+          if (in != null) {
+            in.close();
+          }
+        }
+      }
+    }
+    return historyEvents;
+  }
+
   public static void main(String argv[]) throws IOException {
     // TODO clean up with better usage and error handling
     Configuration conf = new Configuration();
@@ -325,13 +472,15 @@ public class RecoveryParser {
     final TezDAGID dagId;
     boolean completed = false;
     boolean dagCommitCompleted = true;
+    boolean nonRecoverable = false;
+    String reason;
     DAGState dagState;
-    Map<TezVertexID, Boolean> vertexCommitStatus =
+    public Map<TezVertexID, Boolean> vertexCommitStatus =
         new HashMap<TezVertexID, Boolean>();
-    Map<String, Boolean> vertexGroupCommitStatus =
+    public Map<String, Boolean> vertexGroupCommitStatus =
         new HashMap<String, Boolean>();
-    List<HistoryEvent> bufferedSummaryEvents =
-        new ArrayList<HistoryEvent>();
+    public Map<TezVertexID, Boolean> vertexGroupMemberCommitStatus =
+        new HashMap<TezVertexID, Boolean>();
 
     DAGSummaryData(TezDAGID dagId) {
       this.dagId = dagId;
@@ -356,7 +505,6 @@ public class RecoveryParser {
         case DAG_KILL_REQUEST:
           DAGKillRequestEvent killRequestEvent = new DAGKillRequestEvent();
           killRequestEvent.fromSummaryProtoStream(proto);
-          bufferedSummaryEvents.add(killRequestEvent);
           break;
         case DAG_COMMIT_STARTED:
           dagCommitCompleted = false;
@@ -375,24 +523,27 @@ public class RecoveryParser {
           if 
(vertexCommitStatus.containsKey(vertexFinishedEvent.getVertexID())) {
             vertexCommitStatus.put(
                 vertexFinishedEvent.getVertexID(), true);
-            bufferedSummaryEvents.add(vertexFinishedEvent);
           }
           break;
         case VERTEX_GROUP_COMMIT_STARTED:
           VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
               new VertexGroupCommitStartedEvent();
           vertexGroupCommitStartedEvent.fromSummaryProtoStream(proto);
-          bufferedSummaryEvents.add(vertexGroupCommitStartedEvent);
           vertexGroupCommitStatus.put(
               vertexGroupCommitStartedEvent.getVertexGroupName(), false);
+          for (TezVertexID member : 
vertexGroupCommitStartedEvent.getVertexIds()) {
+            vertexGroupMemberCommitStatus.put(member, false);
+          }
           break;
         case VERTEX_GROUP_COMMIT_FINISHED:
           VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
               new VertexGroupCommitFinishedEvent();
           vertexGroupCommitFinishedEvent.fromSummaryProtoStream(proto);
-          bufferedSummaryEvents.add(vertexGroupCommitFinishedEvent);
           vertexGroupCommitStatus.put(
               vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
+          for (TezVertexID member : 
vertexGroupCommitFinishedEvent.getVertexIds()) {
+            vertexGroupMemberCommitStatus.put(member, true);
+          }
           break;
         default:
           String message = "Found invalid summary event that was not handled"
@@ -401,6 +552,37 @@ public class RecoveryParser {
       }
     }
 
+    // Check whether DAG is recoverable based on DAGSummaryData
+    //  1. Whether vertex is in the middle of committing
+    //  2. Whether vertex group is in the middle of committing
+    private void checkRecoverableSummary() {
+      if (!dagCommitCompleted) {
+        this.nonRecoverable = true;
+        this.reason = "DAG Commit was in progress, not recoverable"
+            + ", dagId=" + dagId;
+      }
+      if (!vertexCommitStatus.isEmpty()) {
+        for (Entry<TezVertexID, Boolean> entry : 
vertexCommitStatus.entrySet()) {
+          if (!(entry.getValue().booleanValue())) {
+            this.nonRecoverable = true;
+            this.reason = "Vertex Commit was in progress, not recoverable"
+                + ", dagId=" + dagId
+                + ", vertexId=" + entry.getKey();
+          }
+        }
+      }
+      if (!vertexGroupCommitStatus.isEmpty()) {
+        for (Entry<String, Boolean> entry : 
vertexGroupCommitStatus.entrySet()) {
+          if (!(entry.getValue().booleanValue())) {
+            this.nonRecoverable = true;
+            this.reason = "Vertex Group Commit was in progress, not 
recoverable"
+                + ", dagId=" + dagId
+                + ", vertexGroup=" + entry.getKey();
+          }
+        }
+      }
+    }
+
     @Override
     public String toString() {
       StringBuilder sb = new StringBuilder();
@@ -426,32 +608,6 @@ public class RecoveryParser {
     }
   }
 
-  private String isDAGRecoverable(DAGSummaryData data) {
-    if (!data.dagCommitCompleted) {
-      return "DAG Commit was in progress, not recoverable"
-          + ", dagId=" + data.dagId;
-    }
-    if (!data.vertexCommitStatus.isEmpty()) {
-      for (Entry<TezVertexID, Boolean> entry : 
data.vertexCommitStatus.entrySet()) {
-        if (!(entry.getValue().booleanValue())) {
-          return "Vertex Commit was in progress, not recoverable"
-              + ", dagId=" + data.dagId
-              + ", vertexId=" + entry.getKey();
-        }
-      }
-    }
-    if (!data.vertexGroupCommitStatus.isEmpty()) {
-      for (Entry<String, Boolean> entry : 
data.vertexGroupCommitStatus.entrySet()) {
-        if (!(entry.getValue().booleanValue())) {
-          return "Vertex Group Commit was in progress, not recoverable"
-              + ", dagId=" + data.dagId
-              + ", vertexGroup=" + entry.getKey();
-        }
-      }
-    }
-    return null;
-  }
-
   private List<Path> getSummaryFiles() throws IOException {
     List<Path> summaryFiles = new ArrayList<Path>();
     for (int i = 1; i < currentAttemptId; ++i) {
@@ -483,11 +639,22 @@ public class RecoveryParser {
     return recoveryFiles;
   }
 
-  public RecoveredDAGData parseRecoveryData() throws IOException {
+  /**
+   * 1. Read Summary Recovery file and build DAGSummaryData
+   *    Check whether it is recoverable based on the summary file (whether dag 
is 
+   *    in the middle of committing)
+   * 2. Read the non-Summary Recovery file and build DAGRecoveryData
+   *    Check whether it is recoverable based on both the summary file and 
non-summary file
+   *    (whether vertex has completed its committing, but its full non-summary 
recovery events are not seen)
+   * @return DAGRecoveryData
+   * @throws IOException
+   */
+  public DAGRecoveryData parseRecoveryData() throws IOException {
     int dagCounter = 0;
     Map<TezDAGID, DAGSummaryData> dagSummaryDataMap =
         new HashMap<TezDAGID, DAGSummaryData>();
     List<Path> summaryFiles = getSummaryFiles();
+    LOG.debug("SummaryFile size:" + summaryFiles.size());
     for (Path summaryFile : summaryFiles) {
       FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryFile);
       LOG.info("Parsing summary file"
@@ -559,33 +726,24 @@ public class RecoveryParser {
     LOG.info("Checking if DAG is in recoverable state"
         + ", dagId=" + lastInProgressDAGData.dagId);
 
-    final RecoveredDAGData recoveredDAGData = new RecoveredDAGData();
-    if (lastInProgressDAGData.completed) {
-      recoveredDAGData.isCompleted = true;
-      recoveredDAGData.dagState = lastInProgressDAGData.dagState;
-    }
-
-    String nonRecoverableReason = isDAGRecoverable(lastInProgressDAGData);
-    if (nonRecoverableReason != null) {
-      LOG.warn("Found last inProgress DAG but not recoverable: "
-          + lastInProgressDAGData);
-      recoveredDAGData.nonRecoverable = true;
-      recoveredDAGData.reason = nonRecoverableReason;
-    }
-
+    final DAGRecoveryData recoveredDAGData = new 
DAGRecoveryData(lastInProgressDAGData);
     List<Path> dagRecoveryFiles = getDAGRecoveryFiles(lastInProgressDAG);
     boolean skipAllOtherEvents = false;
     Path lastRecoveryFile = null;
+    // read the non summary events even when it is nonrecoverable. (Just read 
the DAGSubmittedEvent
+    // to create the DAGImpl)
     for (Path dagRecoveryFile : dagRecoveryFiles) {
       if (skipAllOtherEvents) {
         LOG.warn("Other recovery files will be skipped due to error in the 
previous recovery file"
             + lastRecoveryFile);
         break;
       }
+      FileStatus fileStatus = recoveryFS.getFileStatus(dagRecoveryFile);
       lastRecoveryFile = dagRecoveryFile;
       LOG.info("Trying to recover dag from recovery file"
           + ", dagId=" + lastInProgressDAG.toString()
-          + ", dagRecoveryFile=" + dagRecoveryFile);
+          + ", dagRecoveryFile=" + dagRecoveryFile
+          + ", len=" + fileStatus.getLen());
       FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, 
recoveryBufferSize);
       while (true) {
         HistoryEvent event;
@@ -606,14 +764,14 @@ public class RecoveryParser {
           // hit an error - skip reading other events
           break;
         }
+
         HistoryEventType eventType = event.getEventType();
+        LOG.info("Recovering from event"
+            + ", eventType=" + eventType
+            + ", event=" + event.toString());
         switch (eventType) {
           case DAG_SUBMITTED:
-          {
             DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent) event;
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
             recoveredDAGData.recoveredDAG = 
dagAppMaster.createDAG(submittedEvent.getDAGPlan(),
                 lastInProgressDAG);
             recoveredDAGData.cumulativeAdditionalResources = submittedEvent
@@ -624,195 +782,110 @@ public class RecoveryParser {
               skipAllOtherEvents = true;
             }
             break;
-          }
           case DAG_INITIALIZED:
-          {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+            recoveredDAGData.dagInitedEvent = (DAGInitializedEvent)event;
             break;
-          }
           case DAG_STARTED:
-          {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+            recoveredDAGData.dagStartedEvent= (DAGStartedEvent)event;
             break;
-          }
+          case DAG_FINISHED:
+            recoveredDAGData.dagFinishedEvent = (DAGFinishedEvent)event;
+            skipAllOtherEvents = true;
+            break; 
           case DAG_COMMIT_STARTED:
-          {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
-            break;
-          }
           case VERTEX_GROUP_COMMIT_STARTED:
+          case VERTEX_GROUP_COMMIT_FINISHED: 
+          case CONTAINER_LAUNCHED:
           {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
-            break;
-          }
-          case VERTEX_GROUP_COMMIT_FINISHED:
-          {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+            // Nothing to do for now
             break;
           }
           case DAG_KILL_REQUEST:
           {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            break;
-          }
-          case DAG_FINISHED:
-          {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            // If this is seen, nothing to recover
-            assert recoveredDAGData.recoveredDAG != null;
-            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
-            recoveredDAGData.isCompleted = true;
-            recoveredDAGData.dagState =
-                ((DAGFinishedEvent) event).getState();
-            skipAllOtherEvents = true;
-            break;
-          }
-          case CONTAINER_LAUNCHED:
-          {
-            // Nothing to do for now
             break;
           }
           case VERTEX_INITIALIZED:
+
           {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
-            Vertex v = 
recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
-            v.restoreFromEvent(vEvent);
+            VertexInitializedEvent vertexInitEvent = 
(VertexInitializedEvent)event;
+            VertexRecoveryData vertexRecoveryData = 
recoveredDAGData.maybeCreateVertexRecoveryData(vertexInitEvent.getVertexID());
+            vertexRecoveryData.vertexInitedEvent = vertexInitEvent;
             break;
           }
-          case VERTEX_STARTED:
+          case VERTEX_CONFIGURE_DONE:
           {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            VertexStartedEvent vEvent = (VertexStartedEvent) event;
-            Vertex v = 
recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
-            v.restoreFromEvent(vEvent);
+            VertexConfigurationDoneEvent reconfigureDoneEvent = 
(VertexConfigurationDoneEvent)event;
+            VertexRecoveryData vertexRecoveryData = 
recoveredDAGData.maybeCreateVertexRecoveryData(reconfigureDoneEvent.getVertexID());
+            vertexRecoveryData.vertexConfigurationDoneEvent = 
reconfigureDoneEvent;
             break;
           }
-          case VERTEX_PARALLELISM_UPDATED:
+          case VERTEX_STARTED:
           {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            VertexParallelismUpdatedEvent vEvent = 
(VertexParallelismUpdatedEvent) event;
-            Vertex v = 
recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
-            v.restoreFromEvent(vEvent);
+            VertexStartedEvent vertexStartedEvent = (VertexStartedEvent)event;
+            VertexRecoveryData vertexRecoveryData = 
recoveredDAGData.vertexRecoveryDataMap.get(vertexStartedEvent.getVertexID());
+            Preconditions.checkArgument(vertexRecoveryData != null, "No 
VertexInitializedEvent before VertexStartedEvent");
+            vertexRecoveryData.vertexStartedEvent = vertexStartedEvent;
             break;
           }
           case VERTEX_COMMIT_STARTED:
           {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
-            Vertex v = 
recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
-            v.restoreFromEvent(vEvent);
             break;
           }
           case VERTEX_FINISHED:
           {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
-            Vertex v = 
recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
-            v.restoreFromEvent(vEvent);
+            VertexFinishedEvent vertexFinishedEvent = 
(VertexFinishedEvent)event;
+            VertexRecoveryData vertexRecoveryData = 
recoveredDAGData.maybeCreateVertexRecoveryData(vertexFinishedEvent.getVertexID());
+            vertexRecoveryData.vertexFinishedEvent = vertexFinishedEvent;
             break;
           }
           case TASK_STARTED:
           {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            TaskStartedEvent tEvent = (TaskStartedEvent) event;
-            Task task = recoveredDAGData.recoveredDAG.getVertex(
-                tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
-            task.restoreFromEvent(tEvent);
+            TaskStartedEvent taskStartedEvent = (TaskStartedEvent) event;
+            VertexRecoveryData vertexRecoveryData = 
recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getTaskID().getVertexID());
+            Preconditions.checkArgument(vertexRecoveryData != null,
+                "Invalid TaskStartedEvent, its vertex does not exist:" + 
taskStartedEvent.getTaskID().getVertexID());
+            TaskRecoveryData taskRecoveryData = 
vertexRecoveryData.maybeCreateTaskRecoveryData(taskStartedEvent.getTaskID());
+            taskRecoveryData.taskStartedEvent = taskStartedEvent;
             break;
           }
           case TASK_FINISHED:
           {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
-            Task task = recoveredDAGData.recoveredDAG.getVertex(
-                tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
-            task.restoreFromEvent(tEvent);
+            TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent) event;
+            VertexRecoveryData vertexRecoveryData = 
recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getTaskID().getVertexID());
+            Preconditions.checkArgument(vertexRecoveryData != null,
+                "Invalid TaskFinishedEvent, its vertex does not exist:" + 
taskFinishedEvent.getTaskID().getVertexID());
+            TaskRecoveryData taskRecoveryData = 
vertexRecoveryData.maybeCreateTaskRecoveryData(taskFinishedEvent.getTaskID());
+            taskRecoveryData.taskFinishedEvent = taskFinishedEvent;
             break;
           }
           case TASK_ATTEMPT_STARTED:
           {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
-            Task task =
-                recoveredDAGData.recoveredDAG.getVertex(
-                    tEvent.getTaskAttemptID().getTaskID().getVertexID())
-                        .getTask(tEvent.getTaskAttemptID().getTaskID());
-            task.restoreFromEvent(tEvent);
+            TaskAttemptStartedEvent taStartedEvent = 
(TaskAttemptStartedEvent)event;
+            VertexRecoveryData vertexRecoveryData = 
recoveredDAGData.vertexRecoveryDataMap.get(
+                taStartedEvent.getTaskAttemptID().getTaskID().getVertexID());
+            Preconditions.checkArgument(vertexRecoveryData != null,
+                "Invalid TaskAttemptStartedEvent, its vertexId does not exist, 
taId=" + taStartedEvent.getTaskAttemptID());
+            TaskRecoveryData taskRecoveryData = 
vertexRecoveryData.taskRecoveryDataMap
+                .get(taStartedEvent.getTaskAttemptID().getTaskID());
+            Preconditions.checkArgument(taskRecoveryData != null,
+                "Invalid TaskAttemptStartedEvent, its taskId does not exist, 
taId=" + taStartedEvent.getTaskAttemptID());
+            TaskAttemptRecoveryData taRecoveryData = 
taskRecoveryData.maybeCreateTaskAttemptRecoveryData(taStartedEvent.getTaskAttemptID());
+            taRecoveryData.taStartedEvent = taStartedEvent;
             break;
           }
           case TASK_ATTEMPT_FINISHED:
           {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
-            Task task =
-                recoveredDAGData.recoveredDAG.getVertex(
-                    tEvent.getTaskAttemptID().getTaskID().getVertexID())
-                    .getTask(tEvent.getTaskAttemptID().getTaskID());
-            task.restoreFromEvent(tEvent);
-            break;
-          }
-          case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-          {
-            LOG.info("Recovering from event"
-                + ", eventType=" + eventType
-                + ", event=" + event.toString());
-            assert recoveredDAGData.recoveredDAG != null;
-            VertexRecoverableEventsGeneratedEvent vEvent =
-                (VertexRecoverableEventsGeneratedEvent) event;
-            Vertex v = 
recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
-            v.restoreFromEvent(vEvent);
+            TaskAttemptFinishedEvent taFinishedEvent = 
(TaskAttemptFinishedEvent)event;
+            VertexRecoveryData vertexRecoveryData = 
recoveredDAGData.vertexRecoveryDataMap.get(
+                taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID());
+            Preconditions.checkArgument(vertexRecoveryData != null,
+                "Invalid TaskAttemtFinishedEvent, its vertexId does not exist, 
taId=" + taFinishedEvent.getTaskAttemptID());
+            TaskRecoveryData taskRecoveryData = 
vertexRecoveryData.taskRecoveryDataMap
+                .get(taFinishedEvent.getTaskAttemptID().getTaskID());
+            Preconditions.checkArgument(taskRecoveryData != null,
+                "Invalid TaskAttemptFinishedEvent, its taskId does not exist, 
taId=" + taFinishedEvent.getTaskAttemptID());
+            TaskAttemptRecoveryData taRecoveryData = 
taskRecoveryData.maybeCreateTaskAttemptRecoveryData(taFinishedEvent.getTaskAttemptID());
+            taRecoveryData.taFinishedEvent = taFinishedEvent;
             break;
           }
           default:
@@ -828,49 +901,185 @@ public class RecoveryParser {
       }
       dagRecoveryStream.close();
     }
+    recoveredDAGData.checkRecoverableNonSummary();
+    return recoveredDAGData;
+  }
 
-    if (!recoveredDAGData.isCompleted
-        && !recoveredDAGData.nonRecoverable) {
-      if (lastInProgressDAGData.bufferedSummaryEvents != null
-        && !lastInProgressDAGData.bufferedSummaryEvents.isEmpty()) {
-        for (HistoryEvent bufferedEvent : 
lastInProgressDAGData.bufferedSummaryEvents) {
-          assert recoveredDAGData.recoveredDAG != null;
-          switch (bufferedEvent.getEventType()) {
-            case VERTEX_GROUP_COMMIT_STARTED:
-              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
-              break;
-            case VERTEX_GROUP_COMMIT_FINISHED:
-              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
-              break;
-            case VERTEX_FINISHED:
-              VertexFinishedEvent vertexFinishedEvent =
-                  (VertexFinishedEvent) bufferedEvent;
-              Vertex vertex = recoveredDAGData.recoveredDAG.getVertex(
-                  vertexFinishedEvent.getVertexID());
-              if (vertex == null) {
-                recoveredDAGData.nonRecoverable = true;
-                recoveredDAGData.reason = "All state could not be recovered"
-                    + ", vertex completed but events not flushed"
-                    + ", vertexId=" + vertexFinishedEvent.getVertexID();
-              } else {
-                vertex.restoreFromEvent(vertexFinishedEvent);
-              }
-              break;
-            case DAG_KILL_REQUEST:
-              DAGKillRequestEvent killRequestEvent = 
(DAGKillRequestEvent)bufferedEvent;
-              recoveredDAGData.isSessionStopped = 
killRequestEvent.isSessionStopped();
-              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
-              break;
-            default:
-              throw new RuntimeException("Invalid data found in buffered 
summary events"
-                  + ", unknown event type "
-                  + bufferedEvent.getEventType());
-          }
-        }
+  public static class VertexRecoveryData {
+
+    private VertexInitializedEvent vertexInitedEvent;
+    private VertexConfigurationDoneEvent vertexConfigurationDoneEvent;
+    private VertexStartedEvent vertexStartedEvent;
+    private VertexFinishedEvent vertexFinishedEvent;
+    private Map<TezTaskID, TaskRecoveryData> taskRecoveryDataMap =
+        new HashMap<TezTaskID, RecoveryParser.TaskRecoveryData>();
+    private boolean commited;
+
+    @VisibleForTesting
+    public VertexRecoveryData(VertexInitializedEvent vertexInitedEvent,
+        VertexConfigurationDoneEvent vertexReconfigureDoneEvent,
+        VertexStartedEvent vertexStartedEvent,
+        VertexFinishedEvent vertexFinishedEvent,
+        Map<TezTaskID, TaskRecoveryData> taskRecoveryDataMap, boolean 
commited) {
+      super();
+      this.vertexInitedEvent = vertexInitedEvent;
+      this.vertexConfigurationDoneEvent = vertexReconfigureDoneEvent;
+      this.vertexStartedEvent = vertexStartedEvent;
+      this.vertexFinishedEvent = vertexFinishedEvent;
+      this.taskRecoveryDataMap = taskRecoveryDataMap;
+      this.commited = commited;
+    }
+
+    public VertexRecoveryData(boolean committed) {
+      this.commited = committed;
+    }
+ 
+    public VertexInitializedEvent getVertexInitedEvent() {
+      return vertexInitedEvent;
+    }
+
+    public VertexStartedEvent getVertexStartedEvent() {
+      return vertexStartedEvent;
+    }
+
+    public VertexFinishedEvent getVertexFinishedEvent() {
+      return vertexFinishedEvent;
+    }
+
+    public VertexConfigurationDoneEvent getVertexConfigurationDoneEvent() {
+      return vertexConfigurationDoneEvent;
+    }
+
+    public boolean isReconfigureDone() {
+      return vertexConfigurationDoneEvent != null;
+    }
+
+    public boolean isVertexInited() {
+      return vertexInitedEvent != null;
+    }
+
+    public boolean shouldSkipInit() {
+      return vertexInitedEvent != null && vertexConfigurationDoneEvent != null;
+    }
+
+    public boolean isVertexStarted() {
+      return vertexStartedEvent != null;
+    }
+
+    public boolean isVertexSucceeded() {
+      if (vertexFinishedEvent == null) {
+        return false;
       }
+      return vertexFinishedEvent.getState().equals(VertexState.SUCCEEDED);
     }
 
-    return recoveredDAGData;
+    public boolean isVertexFinished() {
+      return vertexFinishedEvent != null;
+    }
+
+    public boolean isVertexCommitted() {
+      return this.commited;
+    }
+
+    public TaskRecoveryData getTaskRecoveryData(TezTaskID taskId) {
+      return taskRecoveryDataMap.get(taskId);
+    }
+
+    public TaskRecoveryData maybeCreateTaskRecoveryData(TezTaskID taskId) {
+      TaskRecoveryData taskRecoveryData = taskRecoveryDataMap.get(taskId);
+      if (taskRecoveryData == null) {
+        taskRecoveryData = new TaskRecoveryData();
+        taskRecoveryDataMap.put(taskId, taskRecoveryData);
+      }
+      return taskRecoveryData;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("VertexInitedEvent=" + vertexInitedEvent);
+      builder.append("");
+      return builder.toString();
+    }
   }
 
+  public static class TaskRecoveryData {
+
+    private TaskStartedEvent taskStartedEvent;
+    private TaskFinishedEvent taskFinishedEvent;
+    private Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap =
+        new HashMap<TezTaskAttemptID, 
RecoveryParser.TaskAttemptRecoveryData>();
+
+    public TaskRecoveryData() {
+
+    }
+
+    @VisibleForTesting
+    public TaskRecoveryData(TaskStartedEvent taskStartedEvent,
+        TaskFinishedEvent taskFinishedEvent,
+        Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap) {
+      super();
+      this.taskStartedEvent = taskStartedEvent;
+      this.taskFinishedEvent = taskFinishedEvent;
+      this.taRecoveryDataMap = taRecoveryDataMap;
+    }
+
+    public TaskStartedEvent getTaskStartedEvent() {
+      return taskStartedEvent;
+    }
+
+    public TaskFinishedEvent getTaskFinishedEvent() {
+      return taskFinishedEvent;
+    }
+
+    public boolean isTaskStarted() {
+      return getTaskStartedEvent() != null;
+    }
+
+    public boolean isTaskAttemptSucceeded(TezTaskAttemptID taId) {
+      TaskAttemptRecoveryData taRecoveryData = taRecoveryDataMap.get(taId);
+      return taRecoveryData == null ? false : 
taRecoveryData.isTaskAttemptSucceeded();
+    }
+
+    public TaskAttemptRecoveryData 
maybeCreateTaskAttemptRecoveryData(TezTaskAttemptID taId) {
+      TaskAttemptRecoveryData taRecoveryData = taRecoveryDataMap.get(taId);
+      if (taRecoveryData == null) {
+        taRecoveryData = new TaskAttemptRecoveryData();
+        taRecoveryDataMap.put(taId, taRecoveryData);
+      }
+      return taRecoveryData;
+    }
+  }
+
+  public static class TaskAttemptRecoveryData {
+
+    private TaskAttemptStartedEvent taStartedEvent;
+    private TaskAttemptFinishedEvent taFinishedEvent;
+
+    public TaskAttemptRecoveryData() {
+
+    }
+
+    @VisibleForTesting
+    public TaskAttemptRecoveryData(TaskAttemptStartedEvent taStartedEvent,
+        TaskAttemptFinishedEvent taFinishedEvent) {
+      super();
+      this.taStartedEvent = taStartedEvent;
+      this.taFinishedEvent = taFinishedEvent;
+    }
+
+    public TaskAttemptStartedEvent getTaskAttemptStartedEvent() {
+      return taStartedEvent;
+    }
+
+    public TaskAttemptFinishedEvent getTaskAttemptFinishedEvent() {
+      return taFinishedEvent;
+    }
+
+    public boolean isTaskAttemptSucceeded() {
+      TaskAttemptFinishedEvent taFinishedEvent = getTaskAttemptFinishedEvent();
+      return taFinishedEvent == null ?
+          false : taFinishedEvent.getState() == TaskAttemptState.SUCCEEDED;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 924222a..92bf3c4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -36,7 +36,9 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.slf4j.Logger;
@@ -57,11 +59,14 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.impl.TezEvent;
@@ -227,12 +232,14 @@ public class TaskCommunicatorManager extends 
AbstractService implements
       }
 
       long currTime = context.getClock().getTime();
-      List<TezEvent> otherEvents = new ArrayList<TezEvent>();
-      // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route 
other events
-      // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, 
TASK_ATTEMPT_FAILED_EVENT)
-      // to VertexImpl to ensure the events ordering
-      //  1. DataMovementEvent is logged as RecoveryEvent before 
TaskAttemptFinishedEvent
-      //  2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
+      // taFinishedEvents - means the TaskAttemptFinishedEvent
+      // taGeneratedEvents - for recovery, means the events generated by this 
task attempt and is needed by its downstream vertices
+      // eventsForVertex - including all the taGeneratedEvents and other 
events such as INPUT_READ_ERROR_EVENT/INPUT_FAILED_EVENT
+      // taGeneratedEvents is routed both to TaskAttempt & Vertex. Route to 
Vertex is for performance consideration
+      // taFinishedEvents must be routed before taGeneratedEvents
+      List<TezEvent> taFinishedEvents = new ArrayList<TezEvent>();
+      List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
+      List<TezEvent> eventsForVertex = new ArrayList<TezEvent>();
       TaskAttemptEventStatusUpdate taskAttemptEvent = null;
       boolean readErrorReported = false;
       for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
@@ -244,21 +251,74 @@ public class TaskCommunicatorManager extends 
AbstractService implements
           // send TA_STATUS_UPDATE before TA_DONE/TA_FAILED/TA_KILLED 
otherwise Status may be missed
           taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
               (TaskStatusUpdateEvent) tezEvent.getEvent());
+        } else if (eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT
+           || eventType == EventType.TASK_ATTEMPT_FAILED_EVENT) {
+          taFinishedEvents.add(tezEvent);
         } else {
           if (eventType == EventType.INPUT_READ_ERROR_EVENT) {
             readErrorReported = true;
           }
-          otherEvents.add(tezEvent);
+          if (eventType == EventType.DATA_MOVEMENT_EVENT
+            || eventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT
+            || eventType == EventType.ROOT_INPUT_INITIALIZER_EVENT
+            || eventType == EventType.VERTEX_MANAGER_EVENT) {
+            taGeneratedEvents.add(tezEvent);
+          }
+          eventsForVertex.add(tezEvent);
         }
       }
       if (taskAttemptEvent != null) {
         taskAttemptEvent.setReadErrorReported(readErrorReported);
         context.getEventHandler().handle(taskAttemptEvent);
       }
-      if(!otherEvents.isEmpty()) {
+      // route taGeneratedEvents to TaskAttempt
+      if (!taGeneratedEvents.isEmpty()) {
+        context.getEventHandler().handle(new 
TaskAttemptEventTezEventUpdate(taskAttemptID, taGeneratedEvents));
+      }
+      // route events to TaskAttempt
+      Preconditions.checkArgument(taFinishedEvents.size() <= 1, "Multiple 
TaskAttemptFinishedEvent");
+      for (TezEvent e : taFinishedEvents) {
+        EventMetaData sourceMeta = e.getSourceInfo();
+        switch (e.getEventType()) {
+        case TASK_ATTEMPT_FAILED_EVENT:
+          TaskAttemptTerminationCause errCause = null;
+          switch (sourceMeta.getEventGenerator()) {
+          case INPUT:
+            errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
+            break;
+          case PROCESSOR:
+            errCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
+            break;
+          case OUTPUT:
+            errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
+            break;
+          case SYSTEM:
+            errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
+            break;
+          default:
+            throw new TezUncheckedException("Unknown 
EventProducerConsumerType: " +
+                sourceMeta.getEventGenerator());
+          }
+          TaskAttemptFailedEvent taskFailedEvent =(TaskAttemptFailedEvent) 
e.getEvent();
+          context.getEventHandler().handle(
+               new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
+                   TaskAttemptEventType.TA_FAILED,
+                  "Error: " + taskFailedEvent.getDiagnostics(),
+                    errCause));
+          break;
+        case TASK_ATTEMPT_COMPLETED_EVENT:
+          context.getEventHandler().handle(
+              new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), 
TaskAttemptEventType.TA_DONE));
+          break;
+        default:
+          throw new TezUncheckedException("Unhandled tez event type: "
+             + e.getEventType());
+        }
+      }
+      if (!eventsForVertex.isEmpty()) {
         TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
         context.getEventHandler().handle(
-            new VertexEventRouteEvent(vertexId, 
Collections.unmodifiableList(otherEvents)));
+            new VertexEventRouteEvent(vertexId, 
Collections.unmodifiableList(eventsForVertex)));
       }
       taskHeartbeatHandler.pinged(taskAttemptID);
       eventInfo = context
@@ -269,6 +329,7 @@ public class TaskCommunicatorManager extends 
AbstractService implements
     }
     return new TaskHeartbeatResponse(false, eventInfo.getEvents(), 
eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId());
   }
+
   public void taskAlive(TezTaskAttemptID taskAttemptId) {
     taskHeartbeatHandler.pinged(taskAttemptId);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 640359d..a01c623 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -34,7 +34,6 @@ import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.common.security.ACLManager;
-import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 
@@ -90,8 +89,6 @@ public interface DAG {
   
   UserGroupInformation getDagUGI();
 
-  DAGState restoreFromEvent(HistoryEvent historyEvent);
-
   ACLManager getACLManager();
 
   Map<String, TezVertexID> getVertexNameIDMapping();

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index a011b61..04f0e5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -26,7 +26,6 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskReport;
 import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -65,8 +64,6 @@ public interface Task {
   
   public List<String> getDiagnostics();
 
-  TaskState restoreFromEvent(HistoryEvent historyEvent);
-
   public void registerTezEvent(TezEvent tezEvent);
   
   public TaskSpec getBaseTaskSpec();

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index cbe72c1..ba09bd9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -28,7 +28,6 @@ import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -136,7 +135,5 @@ public interface TaskAttempt {
    *  yet, returns 0.
    */
   long getFinishTime();
-  
-  TaskAttemptState restoreFromEvent(HistoryEvent event);
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 60f5a8f..9fc73a2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -49,7 +49,6 @@ import org.apache.tez.dag.app.TaskAttemptEventInfo;
 import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
 import org.apache.tez.dag.app.dag.impl.Edge;
-import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -173,8 +172,6 @@ public interface Vertex extends Comparable<Vertex> {
   // internal apis
   AppContext getAppContext();
 
-  VertexState restoreFromEvent(HistoryEvent event);
-
   String getLogIdentifier();
 
   public void incrementFailedTaskAttemptCount();

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
index 1b7ac0f..ba9d1af 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
@@ -27,6 +27,5 @@ public enum VertexState {
   KILLED,
   ERROR,
   TERMINATING,
-  RECOVERING,
   COMMITTING,
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
index 45e44f3..8e1edf0 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
@@ -18,37 +18,34 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-import java.net.URL;
-import java.util.List;
-
+import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.records.TezDAGID;
 
 public class DAGEventRecoverEvent extends DAGEvent {
 
   private final DAGState desiredState;
-  private final List<URL> additionalUrlsForClasspath;
+  private final DAGRecoveryData recoveredDagData;
 
-  public DAGEventRecoverEvent(TezDAGID dagId, DAGState desiredState,
-      List<URL> additionalUrlsForClasspath) {
+  public DAGEventRecoverEvent(TezDAGID dagId, DAGState desiredState, 
DAGRecoveryData recoveredDagData) {
     super(dagId, DAGEventType.DAG_RECOVER);
     this.desiredState = desiredState;
-    this.additionalUrlsForClasspath = additionalUrlsForClasspath;
+    this.recoveredDagData = recoveredDagData;
   }
   
-  public DAGEventRecoverEvent(TezDAGID dagId, List<URL> 
additionalUrlsForClasspath) {
-    this(dagId, null, additionalUrlsForClasspath);
+  public DAGEventRecoverEvent(TezDAGID dagId, DAGRecoveryData 
recoveredDagData) {
+    this(dagId, null, recoveredDagData);
   }
   
   public DAGState getDesiredState() {
     return desiredState;
   }
   
-  public List<URL> getAdditionalUrlsForClasspath() {
-    return this.additionalUrlsForClasspath;
-  }
-
   public boolean hasDesiredState() {
     return this.desiredState != null;
   }
+
+  public DAGRecoveryData getRecoveredDagData() {
+    return recoveredDagData;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/RecoveryEvent.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/RecoveryEvent.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/RecoveryEvent.java
new file mode 100644
index 0000000..cad3824
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/RecoveryEvent.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app.dag.event;
+
+public interface RecoveryEvent {
+
+  public boolean isFromRecovery();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
index 7ec8921..21c6b14 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
@@ -22,10 +22,11 @@ import 
org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent 
-  implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
+  implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent, 
RecoveryEvent {
 
   private final String diagnostics;
   private final TaskAttemptTerminationCause errorCause;
+  private boolean isFromRecovery = false;
 
   /* Accepted Types - FAILED, TIMED_OUT */
   public TaskAttemptEventAttemptFailed(TezTaskAttemptID id,
@@ -35,6 +36,14 @@ public class TaskAttemptEventAttemptFailed extends 
TaskAttemptEvent
     this.errorCause = errorCause;
   }
 
+  /* Accepted Types - FAILED, TIMED_OUT */
+  public TaskAttemptEventAttemptFailed(TezTaskAttemptID id,
+      TaskAttemptEventType type, String diagnostics, 
TaskAttemptTerminationCause errorCause,
+      boolean isFromRecovery) {
+    this(id, type, diagnostics, errorCause);
+    this.isFromRecovery = isFromRecovery;
+  }
+
   @Override
   public String getDiagnosticInfo() {
     return diagnostics;
@@ -45,4 +54,8 @@ public class TaskAttemptEventAttemptFailed extends 
TaskAttemptEvent
     return errorCause;
   }
 
+  @Override
+  public boolean isFromRecovery() {
+    return isFromRecovery;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
index 72e6b07..4642443 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
@@ -22,10 +22,12 @@ import 
org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventAttemptKilled extends TaskAttemptEvent
-    implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
+    implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent, 
RecoveryEvent {
 
   private final String diagnostics;
   private final TaskAttemptTerminationCause errorCause;
+  private boolean fromRecovery;
+
   public TaskAttemptEventAttemptKilled(TezTaskAttemptID id,
                                        String diagnostics,
                                        TaskAttemptTerminationCause errorCause) 
{
@@ -34,6 +36,14 @@ public class TaskAttemptEventAttemptKilled extends 
TaskAttemptEvent
     this.errorCause = errorCause;
   }
 
+  public TaskAttemptEventAttemptKilled(TezTaskAttemptID id,
+      String diagnostics,
+      TaskAttemptTerminationCause errorCause,
+      boolean fromRecovery) {
+    this(id, diagnostics, errorCause);
+    this.fromRecovery = fromRecovery;
+  }
+  
   @Override
   public String getDiagnosticInfo() {
     return diagnostics;
@@ -44,4 +54,8 @@ public class TaskAttemptEventAttemptKilled extends 
TaskAttemptEvent
     return errorCause;
   }
 
+  @Override
+  public boolean isFromRecovery() {
+    return fromRecovery;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
index a0dfe5d..96cf0e6 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
@@ -21,10 +21,11 @@ import 
org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventKillRequest extends TaskAttemptEvent 
-  implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
+  implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent, 
RecoveryEvent {
 
   private final String message;
   private final TaskAttemptTerminationCause errorCause;
+  private boolean fromRecovery = false;
 
   public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message, 
TaskAttemptTerminationCause err) {
     super(id, TaskAttemptEventType.TA_KILL_REQUEST);
@@ -32,6 +33,12 @@ public class TaskAttemptEventKillRequest extends 
TaskAttemptEvent
     this.errorCause = err;
   }
 
+  public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message, 
TaskAttemptTerminationCause err,
+      boolean fromRecovery) {
+    this(id, message, err);
+    this.fromRecovery = fromRecovery;
+  }
+
   @Override
   public String getDiagnosticInfo() {
     return message;
@@ -42,4 +49,9 @@ public class TaskAttemptEventKillRequest extends 
TaskAttemptEvent
     return errorCause;
   }
 
+  @Override
+  public boolean isFromRecovery() {
+    return fromRecovery;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java
index 825a143..e700c6c 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java
@@ -24,11 +24,12 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
-public class TaskAttemptEventStartedRemotely extends TaskAttemptEvent {
+public class TaskAttemptEventStartedRemotely extends TaskAttemptEvent 
implements RecoveryEvent {
 
   private final ContainerId containerId;
   // TODO Can appAcls be handled elsewhere ?
   private final Map<ApplicationAccessType, String> applicationACLs;
+  private boolean fromRecovery = false;
 
   public TaskAttemptEventStartedRemotely(TezTaskAttemptID id, ContainerId 
containerId,
       Map<ApplicationAccessType, String> appAcls) {
@@ -37,6 +38,12 @@ public class TaskAttemptEventStartedRemotely extends 
TaskAttemptEvent {
     this.applicationACLs = appAcls;
   }
 
+  public TaskAttemptEventStartedRemotely(TezTaskAttemptID id, ContainerId 
containerId,
+      Map<ApplicationAccessType, String> appAcls, boolean fromRecovery) {
+    this(id, containerId, appAcls);
+    this.fromRecovery = fromRecovery;
+  }
+
   public ContainerId getContainerId() {
     return containerId;
   }
@@ -45,4 +52,9 @@ public class TaskAttemptEventStartedRemotely extends 
TaskAttemptEvent {
     return applicationACLs;
   }
 
+  @Override
+  public boolean isFromRecovery() {
+    return fromRecovery;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTezEventUpdate.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTezEventUpdate.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTezEventUpdate.java
new file mode 100644
index 0000000..bef9248
--- /dev/null
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTezEventUpdate.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app.dag.event;
+
+import java.util.List;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+public class TaskAttemptEventTezEventUpdate extends TaskAttemptEvent {
+
+  private List<TezEvent> tezEvents;
+
+  public TaskAttemptEventTezEventUpdate(TezTaskAttemptID taId, List<TezEvent> 
tezEvents) {
+    super(taId, TaskAttemptEventType.TA_TEZ_EVENT_UPDATE);
+    this.tezEvents = tezEvents;
+  }
+
+  public List<TezEvent> getTezEvents() {
+    return tezEvents;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index 6ba69e3..dacb0c2 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -29,6 +29,7 @@ public enum TaskAttemptEventType {
   //Producer: TaskAttemptListener | Vertex after routing events
   TA_STARTED_REMOTELY,
   TA_STATUS_UPDATE,
+  TA_TEZ_EVENT_UPDATE,  // for recovery
   TA_DONE,
   TA_FAILED,
   TA_KILLED, // Generated by TaskCommunicators
@@ -55,8 +56,5 @@ public enum TaskAttemptEventType {
   
   // Producer: consumer destination vertex
   TA_OUTPUT_FAILED,
-
-  // Recovery
-  TA_RECOVER,
   
 }

Reply via email to