http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
deleted file mode 100644
index f275a56..0000000
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.records.TezTaskID;
-
-public class TaskEventRecoverTask extends TaskEvent {
-
-  TaskState desiredState;
-
-  boolean recoverDataForAttempts;
-
-  public TaskEventRecoverTask(TezTaskID taskID, TaskState desiredState) {
-    this(taskID, desiredState, true);
-  }
-
-  public TaskEventRecoverTask(TezTaskID taskID, TaskState desiredState,
-      boolean recoverData) {
-    super(taskID, TaskEventType.T_RECOVER);
-    this.desiredState = desiredState;
-    this.recoverDataForAttempts = recoverData;
-  }
-
-  public TaskEventRecoverTask(TezTaskID taskID) {
-    this(taskID, null);
-  }
-
-  public TaskState getDesiredState() {
-    return desiredState;
-  }
-
-  public boolean recoverData() {
-    return recoverDataForAttempts;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java
index 696602a..70d6043 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java
@@ -22,14 +22,17 @@ import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 
-public class TaskEventScheduleTask extends TaskEvent {
+public class TaskEventScheduleTask extends TaskEvent implements RecoveryEvent {
   private final TaskSpec baseTaskSpec;
   private final TaskLocationHint locationHint;
-  
-  public TaskEventScheduleTask(TezTaskID taskId, TaskSpec baseTaskSpec, 
TaskLocationHint locationHint) {
+  private final boolean fromRecovery;
+
+  public TaskEventScheduleTask(TezTaskID taskId, TaskSpec baseTaskSpec, 
TaskLocationHint locationHint,
+      boolean fromRecovery) {
     super(taskId, TaskEventType.T_SCHEDULE);
     this.baseTaskSpec = baseTaskSpec;
     this.locationHint = locationHint;
+    this.fromRecovery = fromRecovery;
   }
   
   public TaskSpec getBaseTaskSpec() {
@@ -39,4 +42,9 @@ public class TaskEventScheduleTask extends TaskEvent {
   public TaskLocationHint getTaskLocationHint() {
     return locationHint;
   }
+
+  @Override
+  public boolean isFromRecovery() {
+    return fromRecovery;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
index d48a0bf..1605869 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
@@ -22,11 +22,12 @@ import 
org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskID;
 
 public class TaskEventTermination extends TaskEvent implements 
DiagnosableEvent,
-    TaskAttemptEventTerminationCauseEvent {
+    TaskAttemptEventTerminationCauseEvent, RecoveryEvent {
 
   private final String diagnostics;
   private final TaskAttemptTerminationCause errorCause;
-  
+  private boolean fromRecovery;
+
   public TaskEventTermination(TezTaskID taskID, TaskAttemptTerminationCause 
errorCause, String diagnostics) {
     super(taskID, TaskEventType.T_TERMINATE);
     this.errorCause = errorCause;
@@ -37,6 +38,12 @@ public class TaskEventTermination extends TaskEvent 
implements DiagnosableEvent,
     }
   }
 
+  public TaskEventTermination(TezTaskID taskID, TaskAttemptTerminationCause 
errorCause, String diagnostics,
+      boolean fromRecovery) {
+    this(taskID, errorCause, diagnostics);
+    this.fromRecovery = fromRecovery;
+  }
+
   @Override
   public String getDiagnosticInfo() {
     return diagnostics;
@@ -47,4 +54,9 @@ public class TaskEventTermination extends TaskEvent 
implements DiagnosableEvent,
     return errorCause;
   }
 
+  @Override
+  public boolean isFromRecovery() {
+    return fromRecovery;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
index baec5f0..726e13e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
@@ -38,7 +38,4 @@ public enum TaskEventType {
   T_ATTEMPT_SUCCEEDED,
   T_ATTEMPT_KILLED,
 
-  // Recovery event
-  T_RECOVER
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
index 34e45fe..4203689 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
@@ -32,5 +32,4 @@ public class VertexEventRecoverVertex extends VertexEvent {
   public VertexState getDesiredState() {
     return desiredState;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
index 69195db..211202d 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
@@ -27,25 +27,13 @@ public class VertexEventRouteEvent extends VertexEvent {
   
   final List<TezEvent> events;
 
-  final boolean recovered;
-
   public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events) {
-    this(vertexId, events, false);
-  }
-
-  public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events,
-      boolean recovered) {
     super(vertexId, VertexEventType.V_ROUTE_EVENT);
     this.events = events;
-    this.recovered = recovered;
   }
 
   public List<TezEvent> getEvents() {
     return events;
   }
 
-  public boolean isRecovered() {
-    return recovered;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
deleted file mode 100644
index e3b9334..0000000
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.tez.dag.app.dag.VertexState;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezVertexID;
-
-import java.util.List;
-
-public class VertexEventSourceVertexRecovered extends VertexEvent {
-
-  VertexState sourceVertexState;
-  TezVertexID sourceVertexID;
-  List<TezTaskAttemptID> completedTaskAttempts;
-  int sourceDistanceFromRoot;
-
-  public VertexEventSourceVertexRecovered(TezVertexID vertexID,
-      TezVertexID sourceVertexID,
-      VertexState sourceVertexState,
-      List<TezTaskAttemptID> completedTaskAttempts,
-      int sourceDistanceFromRoot) {
-    super(vertexID, VertexEventType.V_SOURCE_VERTEX_RECOVERED);
-    this.sourceVertexState = sourceVertexState;
-    this.sourceVertexID = sourceVertexID;
-    this.completedTaskAttempts = completedTaskAttempts;
-    this.sourceDistanceFromRoot = sourceDistanceFromRoot;
-  }
-
-  public VertexState getSourceVertexState() {
-    return sourceVertexState;
-  }
-
-  public TezVertexID getSourceVertexID() {
-    return sourceVertexID;
-  }
-
-  public List<TezTaskAttemptID> getCompletedTaskAttempts() {
-    return completedTaskAttempts;
-  }
-
-  public int getSourceDistanceFromRoot() {
-    return sourceDistanceFromRoot;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 6ea945b..15be94d 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -57,9 +57,6 @@ public enum VertexEventType {
   
   // Producer: Vertex
   V_READY_TO_INIT,
-
-  // Recover Event, Producer:Vertex
-  V_SOURCE_VERTEX_RECOVERED,
   
   // Producer: Edge
   V_NULL_EDGE_INITIALIZED,

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 4dfba84..f395e62 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -31,7 +31,6 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
@@ -82,7 +81,9 @@ import 
org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.RecoveryParser.VertexRecoveryData;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
+import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.DAGReport;
@@ -112,11 +113,11 @@ import 
org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
@@ -204,10 +205,6 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
 
   private final List<String> diagnostics = new ArrayList<String>();
 
-  // Recovery related flags
-  boolean recoveryInitEventSeen = false;
-  boolean recoveryStartEventSeen = false;
-
   private TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
 
   private static final DagStateChangedCallback STATE_CHANGED_CALLBACK = new 
DagStateChangedCallback();
@@ -237,10 +234,11 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
           .addTransition(DAGState.NEW, DAGState.NEW,
               DAGEventType.DAG_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          // either recovered to FINISHED state or recovered to NEW to rerun 
the dag based on the recovery data
           .addTransition(DAGState.NEW,
-              EnumSet.of(DAGState.NEW, DAGState.INITED, DAGState.RUNNING,
-                  DAGState.SUCCEEDED, DAGState.FAILED, DAGState.KILLED,
-                  DAGState.ERROR, DAGState.TERMINATING),
+              EnumSet.of(DAGState.NEW, DAGState.SUCCEEDED,
+                  DAGState.FAILED, DAGState.KILLED,
+                  DAGState.ERROR),
               DAGEventType.DAG_RECOVER,
               new RecoverTransition())
           .addTransition(DAGState.NEW, DAGState.NEW,
@@ -448,11 +446,7 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
 
   Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap();
   Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
-  private DAGState recoveredState = DAGState.NEW;
-
-  @VisibleForTesting
-  boolean recoveryCommitInProgress = false;
-  Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>();
+  private DAGRecoveryData recoveryData;
 
   static class VertexGroupInfo {
     String groupName;
@@ -637,59 +631,6 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
-  public DAGState restoreFromEvent(HistoryEvent historyEvent) {
-    writeLock.lock();
-    try {
-      switch (historyEvent.getEventType()) {
-        case DAG_INITIALIZED:
-          recoveredState = initializeDAG((DAGInitializedEvent) historyEvent);
-          recoveryInitEventSeen = true;
-          return recoveredState;
-        case DAG_STARTED:
-          if (!recoveryInitEventSeen) {
-            throw new RuntimeException("Started Event seen but"
-                + " no Init Event was encountered earlier");
-          }
-          recoveryStartEventSeen = true;
-          this.startTime = ((DAGStartedEvent) historyEvent).getStartTime();
-          recoveredState = DAGState.RUNNING;
-          return recoveredState;
-        case DAG_COMMIT_STARTED:
-          recoveryCommitInProgress = true;
-          return recoveredState;
-        case VERTEX_GROUP_COMMIT_STARTED:
-          VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
-              (VertexGroupCommitStartedEvent) historyEvent;
-          recoveredGroupCommits.put(
-              vertexGroupCommitStartedEvent.getVertexGroupName(), false);
-          return recoveredState;
-        case VERTEX_GROUP_COMMIT_FINISHED:
-          VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
-              (VertexGroupCommitFinishedEvent) historyEvent;
-          recoveredGroupCommits.put(
-              vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
-          return recoveredState;
-        case DAG_KILL_REQUEST:
-          trySetTerminationCause(DAGTerminationCause.DAG_KILL);
-          this.recoveredState = DAGState.KILLED;
-          return recoveredState;
-        case DAG_FINISHED:
-          recoveryCommitInProgress = false;
-          DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
-          setFinishTime(finishedEvent.getFinishTime());
-          recoveredState = finishedEvent.getState();
-          this.fullCounters = finishedEvent.getTezCounters();
-          return recoveredState;
-        default:
-          throw new RuntimeException("Unexpected event received for restoring"
-              + " state, eventType=" + historyEvent.getEventType());
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  @Override
   public ACLManager getACLManager() {
     return this.aclManager;
   }
@@ -1241,39 +1182,50 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
     return taskStats;
   }
 
-  void logJobHistoryFinishedEvent() throws IOException {
-    this.setFinishTime();
-    Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
-    DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
-        finishTime, DAGState.SUCCEEDED, "", getAllCounters(),
-        this.userName, this.dagName, taskStats, 
this.appContext.getApplicationAttemptId());
-    this.appContext.getHistoryHandler().handleCriticalEvent(
-        new DAGHistoryEvent(dagId, finishEvt));
-  }
-
   void logJobHistoryInitedEvent() {
-    DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId,
-        this.initTime, this.userName, this.dagName, 
this.getVertexNameIDMapping());
-    this.appContext.getHistoryHandler().handle(
-        new DAGHistoryEvent(dagId, initEvt));
+    if (recoveryData == null
+        || recoveryData.getDAGInitializedEvent() == null) {
+      DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId,
+          clock.getTime(), this.userName, this.dagName, 
this.getVertexNameIDMapping());
+      this.appContext.getHistoryHandler().handle(
+          new DAGHistoryEvent(dagId, initEvt));
+    }
   }
 
   void logJobHistoryStartedEvent() {
-    DAGStartedEvent startEvt = new DAGStartedEvent(this.dagId,
-        this.startTime, this.userName, this.dagName);
-    this.appContext.getHistoryHandler().handle(
-        new DAGHistoryEvent(dagId, startEvt));
+    if (recoveryData == null
+        || recoveryData.getDAGStartedEvent() == null) {
+      DAGStartedEvent startEvt = new DAGStartedEvent(this.dagId,
+          clock.getTime(), this.userName, this.dagName);
+      this.appContext.getHistoryHandler().handle(
+          new DAGHistoryEvent(dagId, startEvt));
+    }
+  }
+
+  void logJobHistoryFinishedEvent() throws IOException {
+    if (recoveryData == null
+        || recoveryData.getDAGFinishedEvent() == null) {
+      Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
+      DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, clock.getTime(),
+          finishTime, DAGState.SUCCEEDED, "", getAllCounters(),
+          this.userName, this.dagName, taskStats, 
this.appContext.getApplicationAttemptId());
+      this.appContext.getHistoryHandler().handleCriticalEvent(
+          new DAGHistoryEvent(dagId, finishEvt));
+    }
   }
 
   void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException {
-    Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
-    DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
-        clock.getTime(), state,
-        StringUtils.join(getDiagnostics(), LINE_SEPARATOR),
-        getAllCounters(), this.userName, this.dagName, taskStats,
-        this.appContext.getApplicationAttemptId());
-    this.appContext.getHistoryHandler().handleCriticalEvent(
-        new DAGHistoryEvent(dagId, finishEvt));
+    if (recoveryData == null
+        || recoveryData.getDAGFinishedEvent() == null) {
+      Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
+      DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, 0L,
+          clock.getTime(), state,
+          StringUtils.join(getDiagnostics(), LINE_SEPARATOR),
+          getAllCounters(), this.userName, this.dagName, taskStats,
+          this.appContext.getApplicationAttemptId());
+      this.appContext.getHistoryHandler().handleCriticalEvent(
+          new DAGHistoryEvent(dagId, finishEvt));
+    }
   }
 
   // triggered by vertex_complete
@@ -1474,17 +1426,8 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
     }
   }
 
-  public DAGState initializeDAG() {
-    return initializeDAG(null);
-  }
-
-  DAGState initializeDAG(DAGInitializedEvent event) {
-    if (event != null) {
-      initTime = event.getInitTime();
-    } else {
-      initTime = clock.getTime();
-    }
 
+  DAGState initializeDAG() {
     commitAllOutputsOnSuccess = dagConf.getBoolean(
         TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
         TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
@@ -1494,9 +1437,6 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
     if (numVertices == 0) {
       addDiagnostic("No vertices for dag");
       trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
-      if (event != null) {
-        return DAGState.FAILED;
-      }
       return finished(DAGState.FAILED);
     }
 
@@ -1668,139 +1608,68 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
     vertex.setOutputVertices(outVertices);
   }
 
+  /**
+   * 2 cases of recovery:
+   * <ul>
+   * <li>
+   * 1. For the completed dag, recover the dag to the desired state and also 
its vertices,
+   *    but not task & task attempt. This recovery is sync call (after this 
Transition, 
+   *    DAG & vertices are all recovered to the desired state)
+   * </li>
+   * <li>
+   * 2. For the non-completed dag, recover the dag as normal dag execution. 
The only difference
+   *    is setting the recoveryData before sending DAG_INIT event so that some 
steps in the execution
+   *    will be skipped based on the recoveryData
+   * </li>
+   * </ul>
+   */
   private static class RecoverTransition
       implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
     @Override
     public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
-      DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent;
+      DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent)dagEvent;
+      // With desired state, represents the case that DAG is completed
       if (recoverEvent.hasDesiredState()) {
-        // DAG completed or final end state known
-        dag.recoveredState = recoverEvent.getDesiredState();
-      }
-      if (recoverEvent.getAdditionalUrlsForClasspath() != null) {
-        LOG.info("Added additional resources : [" + 
recoverEvent.getAdditionalUrlsForClasspath()
-            + "] to classpath");
-        
RelocalizationUtils.addUrlsToClassPath(recoverEvent.getAdditionalUrlsForClasspath());
-      }
-
-      switch (dag.recoveredState) {
-        case NEW:
-          // send DAG an Init and start events
-          dag.eventHandler.handle(new DAGEvent(dag.getID(), 
DAGEventType.DAG_INIT));
-          dag.eventHandler.handle(new DAGEventStartDag(dag.getID(), null));
-          return DAGState.NEW;
-        case INITED:
-          // DAG inited but not started
-          // This implies vertices need to be sent init event
-          // Root vertices need to be sent start event
-          // The vertices may already have been sent these events but the
-          // DAG start may not have been persisted
-          for (Vertex v : dag.vertices.values()) {
-            if (v.getInputVerticesCount() == 0) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Sending Running Recovery event to root vertex "
-                    + v.getLogIdentifier());
-              }
-              dag.eventHandler.handle(new 
VertexEventRecoverVertex(v.getVertexId(),
-                  VertexState.RUNNING));
-            }
-          }
-          return DAGState.RUNNING;
-        case RUNNING:
-          // if commit is in progress, DAG should fail as commits are not
-          // recoverable
-          boolean groupCommitInProgress = false;
-          if (!dag.recoveredGroupCommits.isEmpty()) {
-            for (Entry<String, Boolean> entry : 
dag.recoveredGroupCommits.entrySet()) {
-              if (!entry.getValue().booleanValue()) {
-                LOG.info("Found a pending Vertex Group commit"
-                    + ", vertexGroup=" + entry.getKey());
-                groupCommitInProgress = true;
-                break;
-              }
-            }
-          }
-
-          if (groupCommitInProgress || dag.recoveryCommitInProgress) {
-            // Fail the DAG as we have not seen a commit completion
-            dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
-            dag.setFinishTime();
-            // Recover all other data for all vertices
-            // send recover event to all vertices with a final end state
-            for (Vertex v : dag.vertices.values()) {
-              VertexState desiredState = VertexState.SUCCEEDED;
-              if (dag.recoveredState.equals(DAGState.KILLED)) {
-                desiredState = VertexState.KILLED;
-              } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
-                  dag.recoveredState)) {
-                desiredState = VertexState.FAILED;
-              }
-              dag.eventHandler.handle(new 
VertexEventRecoverVertex(v.getVertexId(),
-                  desiredState));
-            }
-            DAGState endState = DAGState.FAILED;
-            try {
-              dag.logJobHistoryUnsuccesfulEvent(endState);
-            } catch (IOException e) {
-              LOG.warn("Failed to persist recovery event for DAG completion"
-                  + ", dagId=" + dag.dagId
-                  + ", finalState=" + endState);
-            }
-            dag.eventHandler.handle(new 
DAGAppMasterEventDAGFinished(dag.getID(),
-                endState));
-            return endState;
-          }
-
-          for (Vertex v : dag.vertices.values()) {
-            if (v.getInputVerticesCount() == 0) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Sending Running Recovery event to root vertex "
-                    + v.getLogIdentifier());
-              }
-              dag.eventHandler.handle(new 
VertexEventRecoverVertex(v.getVertexId(),
-                  VertexState.RUNNING));
-            }
-          }
-          return DAGState.RUNNING;
+        VertexState vertexDesiredState = null;
+        switch (recoverEvent.getDesiredState()) {
         case SUCCEEDED:
-        case ERROR:
+          vertexDesiredState = VertexState.SUCCEEDED;
+          break;
         case FAILED:
+          vertexDesiredState = VertexState.FAILED;
+          break;
         case KILLED:
-          // Completed
-          
-          // Recover all other data for all vertices
-          // send recover event to all vertices with a final end state
-          for (Vertex v : dag.vertices.values()) {
-            VertexState desiredState = VertexState.SUCCEEDED;
-            if (dag.recoveredState.equals(DAGState.KILLED)) {
-              desiredState = VertexState.KILLED;
-            } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
-                dag.recoveredState)) {
-              desiredState = VertexState.FAILED;
-            }
-            dag.eventHandler.handle(new 
VertexEventRecoverVertex(v.getVertexId(),
-                desiredState));
-          }
-
-          // Let us inform AM of completion
-          dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
-              dag.recoveredState));
-
-          LOG.info("Recovered DAG: " + dag.getID() + " finished with state: "
-              + dag.recoveredState);
-          return dag.recoveredState;
+          vertexDesiredState = VertexState.KILLED;
+          break;
+        case ERROR:
+          vertexDesiredState = VertexState.ERROR;
+          break;
         default:
-          // Error state
-          LOG.warn("Trying to recover DAG, failed to recover"
-              + " from non-handled state" + dag.recoveredState);
-          // Tell AM ERROR so that it can shutdown
-          dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
-              DAGState.ERROR));
-          return DAGState.FAILED;
+          String msg = "Invalid desired state of DAG"
+              + ", dagName=" + dag.getName()
+              + ", state=" + recoverEvent.getDesiredState();
+          LOG.warn(msg);
+          dag.addDiagnostic(msg);
+          return dag.finished(DAGState.ERROR);
+        }
+        // Initialize dag synchronously to generate the vertices and recover 
its vertices to the desired state.
+        dag.initializeDAG();
+        for (Vertex v : dag.vertexMap.values()) {
+          dag.eventHandler.handle(new 
VertexEventRecoverVertex(v.getVertexId(), vertexDesiredState));
+        }
+        dag.addDiagnostic("DAG is recovered to finished state:" + 
recoverEvent.getDesiredState()
+            + ", but will only recover partial data due to incomplete recovery 
data");
+        return dag.finished(recoverEvent.getDesiredState());
       }
-    }
 
+      // for the cases that DAG is not completed, recover it as normal dag 
execution.
+      dag.recoveryData = recoverEvent.getRecoveredDagData();
+      dag.appContext.setDAGRecoveryData(dag.recoveryData);
+      dag.getEventHandler().handle(new DAGEvent(dag.getID(), 
DAGEventType.DAG_INIT));
+      dag.getEventHandler().handle(new DAGEventStartDag(dag.getID(), 
dag.recoveryData.additionalUrlsForClasspath));
+      return DAGState.NEW;
+    }
   }
 
   private static class InitTransition
@@ -1818,6 +1687,11 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
       // TODO Metrics
       //dag.metrics.submittedJob(dag);
       //dag.metrics.preparingJob(dag);
+      if (dag.recoveryData != null && 
dag.recoveryData.getDAGInitializedEvent() != null) {
+        dag.initTime = dag.recoveryData.getDAGInitializedEvent().getInitTime();
+      } else {
+        dag.initTime = dag.clock.getTime();
+      }
       dag.startDAGCpuTime = dag.appContext.getCumulativeCPUTime();
       dag.startDAGGCTime = dag.appContext.getCumulativeGCTime();
 
@@ -1845,9 +1719,12 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
      */
     @Override
     public void transition(DAGImpl dag, DAGEvent event) {
+      if (dag.recoveryData != null && dag.recoveryData.getDAGStartedEvent() != 
null) {
+        dag.startTime = dag.recoveryData.getDAGStartedEvent().getStartTime();
+      } else {
+        dag.startTime = dag.clock.getTime();
+      }
       DAGEventStartDag startEvent = (DAGEventStartDag) event;
-      dag.startTime = dag.clock.getTime();
-      dag.logJobHistoryStartedEvent();
       List<URL> additionalUrlsForClasspath = 
startEvent.getAdditionalUrlsForClasspath();
       if (additionalUrlsForClasspath != null) {
         LOG.info("Added additional resources : [" + additionalUrlsForClasspath 
 + "] to classpath");
@@ -1858,6 +1735,7 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
 
       // Start all vertices with no incoming edges when job starts
       dag.initializeVerticesAndStart();
+      dag.logJobHistoryStartedEvent();
     }
   }
 
@@ -2032,6 +1910,14 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
 
   }
 
+  private Collection<TezVertexID> getVertexIds(Collection<String> vertexNames) 
{
+    List<TezVertexID> vertexIds = new 
ArrayList<TezVertexID>(vertexNames.size());
+    for (String name : vertexNames) {
+      vertexIds.add(getVertexNameIDMapping().get(name));
+    }
+    return vertexIds;
+  }
+
   private static class VertexReRunningTransition implements
     MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
@@ -2078,17 +1964,30 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
           }
         }
         for (VertexGroupInfo groupInfo : commitList) {
-          if (recoveredGroupCommits.containsKey(groupInfo.groupName)) {
+          if (recoveryData != null && 
recoveryData.isVertexGroupCommitted(groupInfo.groupName)) {
             LOG.info("VertexGroup was already committed as per recovery"
                 + " data, groupName=" + groupInfo.groupName);
+            for (String vertexName : groupInfo.groupMembers) {
+              VertexRecoveryData vertexRecoveryData =
+                  
recoveryData.getVertexRecoveryData(getVertex(vertexName).getVertexId());
+              Preconditions.checkArgument(vertexRecoveryData != null,"Vertex 
Group has been committed"
+                  + ", but no VertexRecoveryData found for its vertex " + 
vertexName);
+              VertexFinishedEvent vertexFinishedEvent = 
vertexRecoveryData.getVertexFinishedEvent();
+              Preconditions.checkArgument(vertexFinishedEvent!= null,"Vertex 
Group has been committed"
+                  + ", but no VertexFinishedEvent found in its vertex " + 
vertexName);
+              Preconditions.checkArgument(vertexFinishedEvent.getState() == 
VertexState.SUCCEEDED,
+                  "Vertex Group has been committed, but unexpected vertex 
state of its vertex "
+                  + vertexName + ", vertexstate=" + 
vertexFinishedEvent.getState());
+            }
             continue;
           }
           groupInfo.commitStarted = true;
           final Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
           try {
+            Collection<TezVertexID> vertexIds = 
getVertexIds(groupInfo.groupMembers);
             appContext.getHistoryHandler().handleCriticalEvent(new 
DAGHistoryEvent(getID(),
                 new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName,
-                    clock.getTime())));
+                    vertexIds, clock.getTime())));
           } catch (IOException e) {
             LOG.error("Failed to send commit recovery event to handler", e);
             recoveryFailed = true;
@@ -2269,9 +2168,10 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
         if (vertexGroup.isCommitted()) {
           if (!commitAllOutputsOnSuccess) {
             try {
+              Collection<TezVertexID> vertexIds = 
getVertexIds(vertexGroup.groupMembers);
               appContext.getHistoryHandler().handleCriticalEvent(new 
DAGHistoryEvent(getID(),
                   new VertexGroupCommitFinishedEvent(getID(), 
commitCompletedEvent.getOutputKey().getEntityName(),
-                      clock.getTime())));
+                      vertexIds, clock.getTime())));
             } catch (IOException e) {
               String diag = "Failed to send commit recovery event to handler, 
" + ExceptionUtils.getStackTrace(e);
               addDiagnostic(diag);

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index bfd1634..957abcf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -61,6 +61,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.RecoveryParser.TaskAttemptRecoveryData;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.Task;
@@ -72,9 +73,13 @@ import 
org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
+import org.apache.tez.dag.app.dag.event.RecoveryEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
@@ -89,7 +94,6 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@@ -109,6 +113,7 @@ import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -166,6 +171,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   private final Lock writeLock;
   protected final AppContext appContext;
   private final TaskHeartbeatHandler taskHeartbeatHandler;
+  private final TaskAttemptRecoveryData recoveryData;
   private long launchTime = 0;
   private long finishTime = 0;
   private String trackerName;
@@ -191,10 +197,12 @@ public class TaskAttemptImpl implements TaskAttempt,
   private DAGCounter localityCounter;
   
   org.apache.tez.runtime.api.impl.TaskStatistics statistics;
-  
+
   long lastNotifyProgressTimestamp = 0;
   private final long hungIntervalMax;
 
+  private List<TezEvent> taGeneratedEvents = Lists.newArrayList();
+
   // Used to store locality information when
   Set<String> taskHosts = new HashSet<String>();
   Set<String> taskRacks = new HashSet<String>();
@@ -240,20 +248,29 @@ public class TaskAttemptImpl implements TaskAttempt,
             (TaskAttemptStateInternal.NEW)
 
       .addTransition(TaskAttemptStateInternal.NEW,
-          EnumSet.of(TaskAttemptStateInternal.START_WAIT, 
TaskAttemptStateInternal.FAILED),
+          EnumSet.of(TaskAttemptStateInternal.NEW, 
TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED),
           TaskAttemptEventType.TA_SCHEDULE, new 
ScheduleTaskattemptTransition())
+       // NEW -> FAILED due to TA_FAILED happens in recovery 
+       // (No TaskAttemptStartedEvent, but with 
TaskAttemptFinishedEvent(FAILED)
+      .addTransition(TaskAttemptStateInternal.NEW,
+              TaskAttemptStateInternal.FAILED,
+          TaskAttemptEventType.TA_FAILED, new 
TerminateTransition(FAILED_HELPER))
       .addTransition(TaskAttemptStateInternal.NEW,
           TaskAttemptStateInternal.KILLED,
           TaskAttemptEventType.TA_KILL_REQUEST,
           new TerminateTransition(KILLED_HELPER))
-
+      // NEW -> KILLED due to TA_KILLED happens in recovery
+      // (No TaskAttemptStartedEvent, but with 
TaskAttemptFinishedEvent(KILLED)    
       .addTransition(TaskAttemptStateInternal.NEW,
-          EnumSet.of(TaskAttemptStateInternal.NEW,
-              TaskAttemptStateInternal.RUNNING,
-              TaskAttemptStateInternal.KILLED,
-              TaskAttemptStateInternal.FAILED,
-              TaskAttemptStateInternal.SUCCEEDED),
-          TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
+          TaskAttemptStateInternal.KILLED,
+          TaskAttemptEventType.TA_KILLED,
+          new TerminateTransition(KILLED_HELPER))
+      // NEW -> SUCCEEDED due to TA_DONE happens in recovery
+      // (with TaskAttemptStartedEvent and with 
TaskAttemptFinishedEvent(SUCCEEDED)    
+      .addTransition(TaskAttemptStateInternal.NEW,
+          TaskAttemptStateInternal.SUCCEEDED,
+          TaskAttemptEventType.TA_DONE,
+          new SucceededTransition())
 
       .addTransition(TaskAttemptStateInternal.START_WAIT,
           TaskAttemptStateInternal.RUNNING,
@@ -328,6 +345,11 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptStateInternal.RUNNING),
           TaskAttemptEventType.TA_OUTPUT_FAILED,
           new OutputReportedFailedTransition())
+       // for recovery, needs to log the TA generated events in 
TaskAttemptFinishedEvent    
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.RUNNING,
+          TaskAttemptEventType.TA_TEZ_EVENT_UPDATE,
+          new TezEventUpdaterTransition())
 
       .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS,
           TaskAttemptStateInternal.KILLED,
@@ -434,9 +456,6 @@ public class TaskAttemptImpl implements TaskAttempt,
 
         .installTopology();
 
-  private TaskAttemptState recoveredState = TaskAttemptState.NEW;
-  private boolean recoveryStartEventSeen = false;
-
   @SuppressWarnings("rawtypes")
   public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler 
eventHandler,
       TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, 
Configuration conf, Clock clock,
@@ -493,6 +512,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 
         TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT);
 
+    this.recoveryData = appContext.getDAGRecoveryData() == null ?
+        null : 
appContext.getDAGRecoveryData().getTaskAttemptRecoveryData(attemptId);
   }
 
   @Override
@@ -521,14 +542,6 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
     TaskSpec baseTaskSpec = task.getBaseTaskSpec();
-    if (baseTaskSpec == null) {
-      // since recovery does not follow normal transitions, 
TaskEventScheduleTask
-      // is not being honored by the recovery code path. Using this to 
workaround 
-      // until recovery is fixed. Calling the non-locking internal method of 
the vertex
-      // to get the taskSpec directly. Since everything happens on the central 
dispatcher 
-      // during recovery this is deadlock free for now. TEZ-1019 should remove 
the need for this.
-      baseTaskSpec = ((VertexImpl) 
vertex).createRemoteTaskSpec(getID().getTaskID().getId());
-    }
     return new TaskSpec(getID(),
         baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
         baseTaskSpec.getVertexParallelism(), 
baseTaskSpec.getProcessorDescriptor(),
@@ -839,52 +852,6 @@ public class TaskAttemptImpl implements TaskAttempt,
     }
   }
 
-  @Override
-  public TaskAttemptState restoreFromEvent(HistoryEvent historyEvent) {
-    writeLock.lock();
-    try {
-      switch (historyEvent.getEventType()) {
-        case TASK_ATTEMPT_STARTED:
-        {
-          TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) 
historyEvent;
-          this.launchTime = tEvent.getStartTime();
-          recoveryStartEventSeen = true;
-          recoveredState = TaskAttemptState.RUNNING;
-          this.containerId = tEvent.getContainerId();
-          sendEvent(createDAGCounterUpdateEventTALaunched(this));
-          return recoveredState;
-        }
-        case TASK_ATTEMPT_FINISHED:
-        {
-          TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) 
historyEvent;
-          this.creationTime = tEvent.getCreationTime();
-          this.allocationTime = tEvent.getAllocationTime();
-          this.launchTime = tEvent.getStartTime();
-          this.finishTime = tEvent.getFinishTime();
-          this.creationCausalTA = tEvent.getCreationCausalTA();
-          this.reportedStatus.counters = tEvent.getCounters();
-          this.reportedStatus.progress = 1f;
-          this.reportedStatus.state = tEvent.getState();
-          this.terminationCause = tEvent.getTaskAttemptError() != null ? 
tEvent.getTaskAttemptError()
-              : TaskAttemptTerminationCause.UNKNOWN_ERROR;
-          this.diagnostics.add(tEvent.getDiagnostics());
-          this.recoveredState = tEvent.getState();
-          if (tEvent.getDataEvents() != null) {
-            this.lastDataEvents.addAll(tEvent.getDataEvents());
-          }
-          sendEvent(createDAGCounterUpdateEventTAFinished(this, 
tEvent.getState()));
-          return recoveredState;
-        }
-        default:
-          throw new RuntimeException("Unexpected event received for restoring"
-              + " state, eventType=" + historyEvent.getEventType());
-  
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
   @SuppressWarnings("unchecked")
   private void sendEvent(Event<?> event) {
     this.eventHandler.handle(event);
@@ -1055,6 +1022,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   protected void logJobHistoryAttemptStarted() {
+    Preconditions.checkArgument(recoveryData == null);
     final String containerIdStr = containerId.toString();
     String inProgressLogsUrl = nodeHttpAddress
        + "/" + "node/containerlogs"
@@ -1081,13 +1049,16 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   protected void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal 
state) {
-    //Log finished events only if an attempt started.
+    Preconditions.checkArgument(recoveryData == null
+        || recoveryData.getTaskAttemptFinishedEvent() == null,
+        "log TaskAttemptFinishedEvent again in recovery when there's already 
another TaskAtttemptFinishedEvent");
     if (getLaunchTime() == 0) return;
 
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getVertex().getName(), getLaunchTime(),
         getFinishTime(), TaskAttemptState.SUCCEEDED, null,
-        "", getCounters(), lastDataEvents, creationTime, creationCausalTA, 
allocationTime);
+        "", getCounters(), lastDataEvents, taGeneratedEvents,
+        creationTime, creationCausalTA, allocationTime);
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1095,6 +1066,9 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   protected void logJobHistoryAttemptUnsuccesfulCompletion(
       TaskAttemptState state) {
+    Preconditions.checkArgument(recoveryData == null
+        || recoveryData.getTaskAttemptFinishedEvent() == null,
+        "log TaskAttemptFinishedEvent again in recovery when there's already 
another TaskAtttemptFinishedEvent");
     long finishTime = getFinishTime();
     if (finishTime <= 0) {
       finishTime = clock.getTime(); // comes here in case it was terminated 
before launch
@@ -1104,8 +1078,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         finishTime, state,
         terminationCause,
         StringUtils.join(
-            getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents, 
-        creationTime, creationCausalTA, allocationTime);
+            getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents,
+        taGeneratedEvents, creationTime, creationCausalTA, allocationTime);
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1116,12 +1090,69 @@ public class TaskAttemptImpl implements TaskAttempt,
   
//////////////////////////////////////////////////////////////////////////////
 
   protected static class ScheduleTaskattemptTransition implements
-      MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, 
TaskAttemptStateInternal> {
+    MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, 
TaskAttemptStateInternal> {
 
     @Override
     public TaskAttemptStateInternal transition(TaskAttemptImpl ta, 
TaskAttemptEvent event) {
-      TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) 
event;
+      if (ta.recoveryData != null) {
+        TaskAttemptStartedEvent taStartedEvent =
+            ta.recoveryData.getTaskAttemptStartedEvent();
+        if (taStartedEvent != null) {
+          ta.launchTime = taStartedEvent.getStartTime();
+          TaskAttemptFinishedEvent taFinishedEvent =
+              ta.recoveryData.getTaskAttemptFinishedEvent();
+          if (taFinishedEvent == null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Only TaskAttemptStartedEvent but no 
TaskAttemptFinishedEvent, "
+                  + "send out TaskAttemptEventAttemptKilled to move it to 
KILLED");
+            }
+            ta.sendEvent(new TaskAttemptEventAttemptKilled(ta.getID(), 
+                "Task Attempt killed in recovery due to can't recover the 
running task attempt",
+                TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, true));
+            return TaskAttemptStateInternal.NEW;
+          }
+        }
+        // No matter whether TaskAttemptStartedEvent is seen, send 
corresponding event to move 
+        // TA to the state of TaskAttemptFinishedEvent
+        TaskAttemptFinishedEvent taFinishedEvent =
+            ta.recoveryData.getTaskAttemptFinishedEvent();
+        Preconditions.checkArgument(taFinishedEvent != null, "Both of 
TaskAttemptStartedEvent and TaskFinishedEvent is null,"
+            + "taskAttemptId=" + ta.getID());
+        switch (taFinishedEvent.getState()) {
+          case FAILED:
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("TaskAttemptFinishedEvent is seen with state of FAILED"
+                  + ", send TA_FAILED to itself"
+                  + ", attemptId=" + ta.attemptId);
+            }
+            ta.sendEvent(new TaskAttemptEventAttemptFailed(ta.getID(), 
TaskAttemptEventType.TA_FAILED,
+                taFinishedEvent.getDiagnostics(), 
taFinishedEvent.getTaskAttemptError(), true));
+            break;
+          case KILLED:
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("TaskAttemptFinishedEvent is seen with state of KILLED"
+                  + ", send TA_KILLED to itself"
+                  + ", attemptId=" + ta.attemptId);
+            }
+            ta.sendEvent(new TaskAttemptEventAttemptKilled(ta.getID(),
+                taFinishedEvent.getDiagnostics(), 
taFinishedEvent.getTaskAttemptError(), true));
+            break;
+          case SUCCEEDED:
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("TaskAttemptFinishedEvent is seen with state of 
SUCCEEDED"
+                  + ", send TA_DONE to itself"
+                  + ", attemptId=" + ta.attemptId);
+            }
+            ta.sendEvent(new TaskAttemptEvent(ta.getID(), 
TaskAttemptEventType.TA_DONE));
+            break;
+          default:
+            throw new TezUncheckedException("Invalid state in 
TaskAttemptFinishedEvent, state=" 
+                + taFinishedEvent.getState() + ", taId=" + ta.getID());
+        }
+        return TaskAttemptStateInternal.NEW;
+      }
 
+      TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) 
event;
       ta.scheduledTime = ta.clock.getTime();
       // TODO Creating the remote task here may not be required in case of
       // recovery.
@@ -1212,7 +1243,14 @@ public class TaskAttemptImpl implements TaskAttempt,
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
       // This transition should not be invoked directly, if a scheduler event 
has already been sent out.
       // Sub-classes should be used if a scheduler request has been sent.
-      ta.setFinishTime();
+      if (ta.recoveryData == null ||
+          ta.recoveryData.getTaskAttemptFinishedEvent() == null) {
+        ta.setFinishTime();
+        ta.logJobHistoryAttemptUnsuccesfulCompletion(helper
+            .getTaskAttemptState());
+      } else {
+        ta.finishTime = 
ta.recoveryData.getTaskAttemptFinishedEvent().getFinishTime();
+      }
 
       if (event instanceof DiagnosableEvent) {
         ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
@@ -1225,11 +1263,16 @@ public class TaskAttemptImpl implements TaskAttempt,
             + ", requiredClass=TaskAttemptEventTerminationCauseEvent"
             + ", eventClass=" + event.getClass().getName());
       }
-
+      if (event instanceof RecoveryEvent) {
+        RecoveryEvent rEvent = (RecoveryEvent)event;
+        if (rEvent.isFromRecovery()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Faked TerminateEvent from recovery, taskAttemptId=" + 
ta.getID());
+          }
+        }
+      }
       ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
           helper.getTaskAttemptState()));
-      ta.logJobHistoryAttemptUnsuccesfulCompletion(helper
-          .getTaskAttemptState());
       // Send out events to the Task - indicating TaskAttemptTermination(F/K)
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper
           .getTaskEventType(), event));
@@ -1413,14 +1456,42 @@ public class TaskAttemptImpl implements TaskAttempt,
     }
   }
 
+  protected static class TezEventUpdaterTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+      TaskAttemptEventTezEventUpdate tezEventUpdate = 
(TaskAttemptEventTezEventUpdate)event;
+      ta.taGeneratedEvents.addAll(tezEventUpdate.getTezEvents());
+    }
+  }
+
   protected static class SucceededTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
 
-      ta.setFinishTime();
-      // Send out history event.
-      ta.logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
+      // If TaskAttempt is recovered to SUCCEEDED, send events generated by 
this TaskAttempt to vertex
+      // for its downstream consumers. For normal dag execution, the events 
are sent by TaskAttmeptListener
+      // for performance consideration.
+      if (ta.recoveryData != null && ta.recoveryData.isTaskAttemptSucceeded()) 
{
+        TaskAttemptFinishedEvent taFinishedEvent = ta.recoveryData
+            .getTaskAttemptFinishedEvent();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("TaskAttempt is recovered to SUCCEEDED, attemptId=" + 
ta.attemptId);
+        }
+        ta.reportedStatus.counters = taFinishedEvent.getCounters();
+        List<TezEvent> tezEvents = taFinishedEvent.getTAGeneratedEvents();
+        if (tezEvents != null && !tezEvents.isEmpty()) {
+          ta.sendEvent(new VertexEventRouteEvent(ta.getVertexID(), tezEvents));
+        }
+        ta.finishTime = taFinishedEvent.getFinishTime();
+      } else {
+        ta.setFinishTime();
+        // Send out history event.
+        
ta.logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
+      }
+
       ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
           TaskAttemptState.SUCCEEDED));
 
@@ -1520,48 +1591,6 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   }
 
-  protected static class RecoverTransition implements
-      MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, 
TaskAttemptStateInternal> {
-
-    @Override
-    public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, 
TaskAttemptEvent taskAttemptEvent) {
-      TaskAttemptStateInternal endState = TaskAttemptStateInternal.FAILED;
-      switch(taskAttempt.recoveredState) {
-        case NEW:
-        case RUNNING:
-          // FIXME once running containers can be recovered, this
-          // should be handled differently
-          // TODO abort taskattempt
-          taskAttempt.sendEvent(new TaskEventTAUpdate(taskAttempt.attemptId,
-              TaskEventType.T_ATTEMPT_KILLED));
-          
taskAttempt.sendEvent(createDAGCounterUpdateEventTAFinished(taskAttempt,
-              getExternalState(TaskAttemptStateInternal.KILLED)));
-          
taskAttempt.logJobHistoryAttemptUnsuccesfulCompletion(TaskAttemptState.KILLED);
-          endState = TaskAttemptStateInternal.KILLED;
-          break;
-        case SUCCEEDED:
-          // Do not inform Task as it already knows about completed attempts
-          endState = TaskAttemptStateInternal.SUCCEEDED;
-          break;
-        case FAILED:
-          // Do not inform Task as it already knows about completed attempts
-          endState = TaskAttemptStateInternal.FAILED;
-          break;
-        case KILLED:
-          // Do not inform Task as it already knows about completed attempts
-          endState = TaskAttemptStateInternal.KILLED;
-          break;
-        default:
-          throw new RuntimeException("Failed to recover from non-handled state"
-              + ", taskAttemptId=" + taskAttempt.getID()
-              + ", state=" + taskAttempt.recoveredState);
-      }
-
-      return endState;
-    }
-
-  }
-
   protected static class TerminatedAfterSuccessTransition implements
       MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, 
TaskAttemptStateInternal> {
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 2f304c8..55dd518 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -57,6 +57,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
+import org.apache.tez.dag.app.RecoveryParser.TaskRecoveryData;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Task;
@@ -68,12 +69,9 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
-import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
 import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
@@ -84,9 +82,6 @@ import 
org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
 import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@@ -138,6 +133,8 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
   long scheduledTime;
   final StateChangeNotifier stateChangeNotifier;
 
+  private final TaskRecoveryData recoveryData;
+
   private final List<TezEvent> tezEventsForTaskAttempts = new 
ArrayList<TezEvent>();
   static final ArrayList<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
       new ArrayList(0);
@@ -150,8 +147,6 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
   private static final SingleArcTransition<TaskImpl, TaskEvent>
      KILL_TRANSITION = new KillTransition();
 
-  // Recovery related flags
-  boolean recoveryStartEventSeen = false;
 
   private static final TaskStateChangedCallback STATE_CHANGED_CALLBACK = new 
TaskStateChangedCallback();
   
@@ -164,20 +159,14 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
     // define the state machine of Task
 
     // Transitions from NEW state
-    .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
+    // Stay in NEW in recovery when Task is killed in the previous AM          
 
+    .addTransition(TaskStateInternal.NEW,
+        EnumSet.of(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED),
         TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
         TaskEventType.T_TERMINATE,
         new KillNewTransition())
 
-    // Recover transition
-    .addTransition(TaskStateInternal.NEW,
-        EnumSet.of(TaskStateInternal.NEW,
-            TaskStateInternal.SCHEDULED,
-            TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
-            TaskStateInternal.FAILED, TaskStateInternal.KILLED),
-        TaskEventType.T_RECOVER, new RecoverTransition())
-
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
      .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING,
@@ -191,6 +180,11 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
         EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
         TaskEventType.T_ATTEMPT_FAILED,
         new AttemptFailedTransition())
+     // Happens in recovery   
+     .addTransition(TaskStateInternal.SCHEDULED,
+        EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED),
+        TaskEventType.T_ATTEMPT_SUCCEEDED,
+        new AttemptSucceededTransition())
 
     // When current attempt fails/killed and new attempt launched then
     // TODO Task should go back to SCHEDULED state TEZ-495
@@ -199,7 +193,8 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
         TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
-    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
+    .addTransition(TaskStateInternal.RUNNING, 
+        EnumSet.of(TaskStateInternal.SUCCEEDED),
         TaskEventType.T_ATTEMPT_SUCCEEDED,
         new AttemptSucceededTransition())
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
@@ -327,7 +322,6 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
   int failedAttempts;
 
   private final boolean leafVertex;
-  private TaskState recoveredState = TaskState.NEW;
 
   @Override
   public TaskState getState() {
@@ -366,6 +360,8 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
     this.leafVertex = leafVertex;
     this.taskResource = resource;
     this.containerContext = containerContext;
+    this.recoveryData = appContext.getDAGRecoveryData() == null ?
+        null : appContext.getDAGRecoveryData().getTaskRecoveryData(taskId);
     stateMachine = new StateMachineTez<TaskStateInternal, TaskEventType, 
TaskEvent, TaskImpl>(
         stateMachineFactory.make(this), this);
     augmentStateMachine();
@@ -545,122 +541,6 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
     }
   }
 
-  private TaskAttempt createRecoveredTaskAttempt(TezTaskAttemptID 
tezTaskAttemptID) {
-    TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId(), null);
-    return taskAttempt;
-  }
-
-  @Override
-  public TaskState restoreFromEvent(HistoryEvent historyEvent) {
-    writeLock.lock();
-    try {
-      switch (historyEvent.getEventType()) {
-        case TASK_STARTED:
-        {
-          TaskStartedEvent tEvent = (TaskStartedEvent) historyEvent;
-          recoveryStartEventSeen = true;
-          this.scheduledTime = tEvent.getScheduledTime();
-          if (this.attempts == null
-              || this.attempts.isEmpty()) {
-            this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
-          }
-          recoveredState = TaskState.SCHEDULED;
-          taskAttemptStatus.clear();
-          return recoveredState;
-        }
-        case TASK_FINISHED:
-        {
-          TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent;
-          if (!recoveryStartEventSeen
-              && !tEvent.getState().equals(TaskState.KILLED)) {
-            throw new TezUncheckedException("Finished Event seen but"
-                + " no Started Event was encountered earlier"
-                + ", taskId=" + taskId
-                + ", finishState=" + tEvent.getState());
-          }
-          recoveredState = tEvent.getState();
-          if (tEvent.getState() == TaskState.SUCCEEDED
-              && tEvent.getSuccessfulAttemptID() != null) {
-            successfulAttempt = tEvent.getSuccessfulAttemptID();
-          }
-          return recoveredState;
-        }
-        case TASK_ATTEMPT_STARTED:
-        {
-          TaskAttemptStartedEvent taskAttemptStartedEvent =
-              (TaskAttemptStartedEvent) historyEvent;
-          TaskAttempt recoveredAttempt = createRecoveredTaskAttempt(
-              taskAttemptStartedEvent.getTaskAttemptID());
-          recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Adding restored attempt into known attempts map"
-                + ", taskAttemptId=" + 
taskAttemptStartedEvent.getTaskAttemptID());
-          }
-          
Preconditions.checkArgument(this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
-              recoveredAttempt) == null, 
taskAttemptStartedEvent.getTaskAttemptID() + " already existed.");
-          
this.taskAttemptStatus.put(taskAttemptStartedEvent.getTaskAttemptID().getId(), 
false);
-          this.recoveredState = TaskState.RUNNING;
-          return recoveredState;
-        }
-        case TASK_ATTEMPT_FINISHED:
-        {
-          TaskAttemptFinishedEvent taskAttemptFinishedEvent =
-              (TaskAttemptFinishedEvent) historyEvent;
-          TaskAttempt taskAttempt = this.attempts.get(
-              taskAttemptFinishedEvent.getTaskAttemptID());
-          
this.taskAttemptStatus.put(taskAttemptFinishedEvent.getTaskAttemptID().getId(), 
true);
-          if (taskAttempt == null) {
-            LOG.warn("Received an attempt finished event for an attempt that "
-                + " never started or does not exist"
-                + ", taskAttemptId=" + 
taskAttemptFinishedEvent.getTaskAttemptID()
-                + ", taskAttemptFinishState=" + 
taskAttemptFinishedEvent.getState());
-            TaskAttempt recoveredAttempt = createRecoveredTaskAttempt(
-                taskAttemptFinishedEvent.getTaskAttemptID());
-            this.attempts.put(taskAttemptFinishedEvent.getTaskAttemptID(),
-                recoveredAttempt);
-            // Allow TaskAttemptFinishedEvent without TaskAttemptStartedEvent 
when it is KILLED/FAILED
-            if 
(!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.KILLED)
-                && 
!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.FAILED)) {
-              throw new TezUncheckedException("Could not find task attempt"
-                  + " when trying to recover"
-                  + ", taskAttemptId=" + 
taskAttemptFinishedEvent.getTaskAttemptID()
-                  + ", taskAttemptFinishState" + 
taskAttemptFinishedEvent.getState());
-            }
-            taskAttempt = recoveredAttempt;
-          }
-          if (getUncompletedAttemptsCount() < 0) {
-            throw new TezUncheckedException("Invalid recovery event for 
attempt finished"
-                + ", more completions than starts encountered"
-                + ", taskId=" + taskId
-                + ", finishedAttempts=" + getFinishedAttemptsCount()
-                + ", incompleteAttempts=" + getUncompletedAttemptsCount());
-          }
-          TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent(
-              taskAttemptFinishedEvent);
-          if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) {
-            recoveredState = TaskState.SUCCEEDED;
-            successfulAttempt = taskAttempt.getID();
-          } else if (taskAttemptState.equals(TaskAttemptState.FAILED)){
-            failedAttempts++;
-            getVertex().incrementFailedTaskAttemptCount();
-            successfulAttempt = null;
-            recoveredState = TaskState.RUNNING; // reset to RUNNING, may fail 
after SUCCEEDED
-          } else if (taskAttemptState.equals(TaskAttemptState.KILLED)) {
-            successfulAttempt = null;
-            getVertex().incrementKilledTaskAttemptCount();
-            recoveredState = TaskState.RUNNING; // reset to RUNNING, may been 
killed after SUCCEEDED
-          }
-          return recoveredState;
-        }
-        default:
-          throw new RuntimeException("Unexpected event received for restoring"
-              + " state, eventType=" + historyEvent.getEventType());
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
   @VisibleForTesting
   public TaskStateInternal getInternalState() {
     readLock.lock();
@@ -1046,17 +926,39 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
   }
 
   private static class InitialScheduleTransition
-    implements SingleArcTransition<TaskImpl, TaskEvent> {
+    implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
     @Override
-    public void transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+      if (task.recoveryData != null) {
+        TaskStartedEvent tStartedEvent = 
task.recoveryData.getTaskStartedEvent();
+        TaskFinishedEvent tFinishedEvent = 
task.recoveryData.getTaskFinishedEvent();
+        // If TaskStartedEvent is not seen but TaskFinishedEvent is seen, that 
means 
+        // Task is killed before it is started. Just send T_TERMINATE to 
itself to move to KILLED
+        if (tStartedEvent == null
+            && tFinishedEvent != null) {
+          Preconditions.checkArgument(tFinishedEvent.getState() == 
TaskState.KILLED,
+              "TaskStartedEvent is not seen, but TaskFinishedEvent is seen and 
with invalid state="
+                  + tFinishedEvent.getState() + ", taskId=" + 
task.getTaskId());
+          // TODO (TEZ-2938)
+          // use tFinishedEvent.getTerminationCause after adding 
TaskTerminationCause to TaskFinishedEvent
+          task.eventHandler.handle(new TaskEventTermination(task.taskId,
+              TaskAttemptTerminationCause.UNKNOWN_ERROR, 
tFinishedEvent.getDiagnostics(), true));
+          return TaskStateInternal.NEW;
+        }
+      } else {
+        task.scheduledTime = task.clock.getTime();
+        task.logJobHistoryTaskStartedEvent();
+      }
+      // No matter whether it is in recovery or normal execution, always 
schedule new task attempt.
+      // TaskAttempt will continue the recovery if necessary and send task 
attempt status
+      // to this Task.
       TaskEventScheduleTask scheduleEvent = (TaskEventScheduleTask) event;
       task.locationHint = scheduleEvent.getTaskLocationHint();
       task.baseTaskSpec = scheduleEvent.getBaseTaskSpec();
       // For now, initial scheduling dependency is due to vertex manager 
scheduling
       task.addAndScheduleAttempt(null);
-      task.scheduledTime = task.clock.getTime();
-      task.logJobHistoryTaskStartedEvent();
+      return TaskStateInternal.SCHEDULED;
     }
   }
 
@@ -1085,10 +987,67 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
 
 
   private static class AttemptSucceededTransition
-      implements SingleArcTransition<TaskImpl, TaskEvent> {
+      implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> 
{
+
+    private boolean recoverSuccessTaskAttempt(TaskImpl task) {
+      // Found successful attempt
+      // Recover data
+      boolean recoveredData = true;
+      if (task.getVertex().getOutputCommitters() != null
+          && !task.getVertex().getOutputCommitters().isEmpty()) {
+        for (Entry<String, OutputCommitter> entry
+            : task.getVertex().getOutputCommitters().entrySet()) {
+          LOG.info("Recovering data for task from previous DAG attempt"
+              + ", taskId=" + task.getTaskId()
+              + ", output=" + entry.getKey());
+          OutputCommitter committer = entry.getValue();
+          if (!committer.isTaskRecoverySupported()) {
+            LOG.info("Task recovery not supported by committer"
+                + ", failing task attempt"
+                + ", taskId=" + task.getTaskId()
+                + ", attemptId=" + task.successfulAttempt
+                + ", output=" + entry.getKey());
+            recoveredData = false;
+            break;
+          }
+          try {
+            committer.recoverTask(task.getTaskId().getId(),
+                task.appContext.getApplicationAttemptId().getAttemptId()-1);
+          } catch (Exception e) {
+            LOG.warn("Task recovery failed by committer"
+                + ", taskId=" + task.getTaskId()
+                + ", attemptId=" + task.successfulAttempt
+                + ", output=" + entry.getKey(), e);
+            recoveredData = false;
+            break;
+          }
+        }
+      }
+      return recoveredData;
+    }
+
     @Override
-    public void transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       TezTaskAttemptID successTaId = ((TaskEventTAUpdate) 
event).getTaskAttemptID();
+      // Try to recover the succeeded TaskAttempt. It may be not recoverable 
if has committer which don't support
+      // recovery. In that case just reschedule new attempt if 
numFailedAttempts does not exceeded maxFailedAttempts.
+      if (task.recoveryData!= null
+          && task.recoveryData.isTaskAttemptSucceeded(successTaId)) {
+        boolean recoveredData = recoverSuccessTaskAttempt(task);
+        if (!recoveredData) {
+          // Move this TA to KILLED (TEZ-2958)
+          LOG.info("Can not recovery the successful task attempt, schedule new 
task attempt,"
+              + "taskId=" + task.getTaskId());
+          task.successfulAttempt = null;
+          task.addAndScheduleAttempt(successTaId);
+          return TaskStateInternal.RUNNING;
+        } else {
+          task.successfulAttempt = successTaId;
+          LOG.info("Recovered a successful attempt"
+              + ", taskAttemptId=" + task.successfulAttempt.toString());
+        }
+      }
+      // both recovery to succeeded and normal dag succeeded go here.
       if (task.commitAttempt != null &&
           !task.commitAttempt.equals(successTaId)) {
         // The succeeded attempt is not the one that was selected to commit
@@ -1136,7 +1095,7 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
       task.eventHandler.handle(new DAGEventSchedulerUpdate(
           DAGEventSchedulerUpdate.UpdateType.TA_SUCCEEDED, task.attempts
               .get(task.successfulAttempt)));
-      task.finished(TaskStateInternal.SUCCEEDED);
+      return task.finished(TaskStateInternal.SUCCEEDED);
     }
   }
 
@@ -1162,139 +1121,6 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
     }
   }
 
-  private static class RecoverTransition implements
-      MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
-
-    @Override
-    public TaskStateInternal transition(TaskImpl task, TaskEvent taskEvent) {
-      if (taskEvent instanceof TaskEventRecoverTask) {
-        TaskEventRecoverTask taskEventRecoverTask =
-            (TaskEventRecoverTask) taskEvent;
-        if (taskEventRecoverTask.getDesiredState() != null
-            && !taskEventRecoverTask.recoverData()) {
-          // TODO recover attempts if desired state is given?
-          // History may not have all data.
-          switch (taskEventRecoverTask.getDesiredState()) {
-            case SUCCEEDED:
-              return TaskStateInternal.SUCCEEDED;
-            case FAILED:
-              return TaskStateInternal.FAILED;
-            case KILLED:
-              return TaskStateInternal.KILLED;
-          }
-        }
-      }
-
-      TaskStateInternal endState = TaskStateInternal.NEW;
-      if (task.attempts != null) {
-        for (TaskAttempt taskAttempt : task.attempts.values()) {
-          task.eventHandler.handle(new TaskAttemptEvent(
-              taskAttempt.getID(), TaskAttemptEventType.TA_RECOVER));
-        }
-      }
-      LOG.info("Trying to recover task"
-          + ", taskId=" + task.getTaskId()
-          + ", recoveredState=" + task.recoveredState);
-      switch(task.recoveredState) {
-        case NEW:
-          // Nothing to do until the vertex schedules this task
-          endState = TaskStateInternal.NEW;
-          break;
-        case SCHEDULED:
-        case RUNNING:
-        case SUCCEEDED:
-          if (task.successfulAttempt != null) {
-            //Found successful attempt
-            //Recover data
-            boolean recoveredData = true;
-            if (task.getVertex().getOutputCommitters() != null
-                && !task.getVertex().getOutputCommitters().isEmpty()) {
-              for (Entry<String, OutputCommitter> entry
-                  : task.getVertex().getOutputCommitters().entrySet()) {
-                LOG.info("Recovering data for task from previous DAG attempt"
-                    + ", taskId=" + task.getTaskId()
-                    + ", output=" + entry.getKey());
-                OutputCommitter committer = entry.getValue();
-                if (!committer.isTaskRecoverySupported()) {
-                  LOG.info("Task recovery not supported by committer"
-                      + ", failing task attempt"
-                      + ", taskId=" + task.getTaskId()
-                      + ", attemptId=" + task.successfulAttempt
-                      + ", output=" + entry.getKey());
-                  recoveredData = false;
-                  break;
-                }
-                try {
-                  committer.recoverTask(task.getTaskId().getId(),
-                      
task.appContext.getApplicationAttemptId().getAttemptId()-1);
-                } catch (Exception e) {
-                  LOG.warn("Task recovery failed by committer"
-                      + ", taskId=" + task.getTaskId()
-                      + ", attemptId=" + task.successfulAttempt
-                      + ", output=" + entry.getKey(), e);
-                  recoveredData = false;
-                  break;
-                }
-              }
-            }
-            if (!recoveredData) {
-              task.successfulAttempt = null;
-            } else {
-              LOG.info("Recovered a successful attempt"
-                  + ", taskAttemptId=" + task.successfulAttempt.toString());
-              task.logJobHistoryTaskFinishedEvent();
-              task.eventHandler.handle(
-                  new VertexEventTaskCompleted(task.taskId,
-                      getExternalState(TaskStateInternal.SUCCEEDED)));
-              task.eventHandler.handle(
-                  new VertexEventTaskAttemptCompleted(
-                      task.successfulAttempt, 
TaskAttemptStateInternal.SUCCEEDED));
-              endState = TaskStateInternal.SUCCEEDED;
-              break;
-            }
-          }
-
-          if (endState != TaskStateInternal.SUCCEEDED &&
-              task.failedAttempts >= task.maxFailedAttempts) {
-            // Exceeded max attempts
-            task.finished(TaskStateInternal.FAILED);
-            endState = TaskStateInternal.FAILED;
-            break;
-          }
-
-          // no successful attempt and all attempts completed
-          // schedule a new one
-          // If any incomplete, the running attempt will moved to failed and 
its
-          // update will trigger a new attempt if possible
-          if (task.attempts.size() == task.getFinishedAttemptsCount()) {
-            task.addAndScheduleAttempt(null);
-          }
-          endState = TaskStateInternal.RUNNING;
-          break;
-        case KILLED:
-          // Nothing to do
-          // Inform vertex
-          task.eventHandler.handle(
-              new VertexEventTaskCompleted(task.taskId,
-                  getExternalState(TaskStateInternal.KILLED)));
-          endState  = TaskStateInternal.KILLED;
-          break;
-        case FAILED:
-          // Nothing to do
-          // Inform vertex
-          task.eventHandler.handle(
-              new VertexEventTaskCompleted(task.taskId,
-                  getExternalState(TaskStateInternal.FAILED)));
-
-          endState = TaskStateInternal.FAILED;
-          break;
-      }
-
-      return endState;
-    }
-  }
-
-
   private static class KillWaitAttemptCompletedTransition implements
       MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
@@ -1486,7 +1312,13 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
     public void transition(TaskImpl task, TaskEvent event) {
       TaskEventTermination terminateEvent = (TaskEventTermination)event;
       task.addDiagnosticInfo(terminateEvent.getDiagnosticInfo());
-      task.logJobHistoryTaskFailedEvent(TaskState.KILLED);
+      if (terminateEvent.isFromRecovery()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Recovered to KILLED, taskId=" + task.getTaskId());
+        }
+      } else {
+        task.logJobHistoryTaskFailedEvent(TaskState.KILLED);
+      }
       task.eventHandler.handle(
           new VertexEventTaskCompleted(task.taskId, TaskState.KILLED));
       // TODO Metrics

Reply via email to