http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5b75179..758c637 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -17,7 +17,10 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -41,8 +44,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.annotation.Nullable;
 
-import com.google.common.base.Strings;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -64,8 +65,8 @@ import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputCommitterDescriptor;
@@ -73,12 +74,15 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.Scope;
+import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -94,6 +98,7 @@ import 
org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.RecoveryParser.VertexRecoveryData;
 import org.apache.tez.dag.app.TaskAttemptEventInfo;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
@@ -113,10 +118,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
 import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
-import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
 import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
@@ -130,24 +132,20 @@ import 
org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
-import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.app.dag.impl.Edge.PendingEventRouteMetadata;
 import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
 import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
@@ -158,19 +156,19 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputSpecUpdate;
 import org.apache.tez.runtime.api.InputStatistics;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
-import org.apache.tez.runtime.api.InputSpecUpdate;
 import org.apache.tez.runtime.api.OutputStatistics;
 import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventType;
@@ -180,9 +178,14 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -192,11 +195,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
-import org.apache.tez.state.OnStateChangedCallback;
-import org.apache.tez.state.StateMachineTez;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /** Implementation of Vertex interface. Maintains the state machines of Vertex.
  * The read and write calls use ReadWriteLock for concurrency.
  */
@@ -222,6 +220,10 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
   //private final MRAppMetrics metrics;
   private final AppContext appContext;
   private final DAG dag;
+  private final VertexRecoveryData recoveryData;
+  private List<TezEvent> initGeneratedEvents = new ArrayList<TezEvent>();
+  // set it to be true when setParallelism is called(used for recovery) 
+  private boolean setParallelismCalledFlag = false;
 
   private boolean lazyTasksCopyNeeded = false;
   // must be a linked map for ordering
@@ -281,11 +283,6 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
   private static final VertexStateChangedCallback STATE_CHANGED_CALLBACK =
       new VertexStateChangedCallback();
 
-  private VertexState recoveredState = VertexState.NEW;
-  @VisibleForTesting
-  List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
-  private boolean vertexAlreadyInitialized = false;
-
   @VisibleForTesting
   final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>();
 
@@ -307,7 +304,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
           .addTransition
               (VertexState.NEW,
                   EnumSet.of(VertexState.NEW, VertexState.INITED,
-                      VertexState.INITIALIZING, VertexState.FAILED),
+                      VertexState.INITIALIZING, VertexState.FAILED, 
VertexState.KILLED),
                   VertexEventType.V_INIT,
                   new InitTransition())
           .addTransition(VertexState.NEW,
@@ -324,21 +321,10 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
                 SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition
               (VertexState.NEW,
-                  EnumSet.of(VertexState.NEW, VertexState.INITED,
-                      VertexState.INITIALIZING, VertexState.RUNNING,
+                  EnumSet.of(VertexState.NEW,
                       VertexState.SUCCEEDED, VertexState.FAILED,
-                      VertexState.KILLED, VertexState.ERROR,
-                      VertexState.RECOVERING),
+                      VertexState.KILLED, VertexState.ERROR),
                   VertexEventType.V_RECOVER,
-                  new StartRecoverTransition())
-          .addTransition
-              (VertexState.NEW,
-                  EnumSet.of(VertexState.NEW, VertexState.INITED,
-                      VertexState.INITIALIZING, VertexState.RUNNING,
-                      VertexState.SUCCEEDED, VertexState.FAILED,
-                      VertexState.KILLED, VertexState.ERROR,
-                      VertexState.RECOVERING),
-                  VertexEventType.V_SOURCE_VERTEX_RECOVERED,
                   new RecoverTransition())
           .addTransition(VertexState.NEW, VertexState.NEW,
               VertexEventType.V_SOURCE_VERTEX_STARTED,
@@ -349,31 +335,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
           .addTransition(VertexState.NEW, VertexState.ERROR,
               VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-          .addTransition
-              (VertexState.RECOVERING,
-                  EnumSet.of(VertexState.NEW, VertexState.INITED,
-                      VertexState.INITIALIZING, VertexState.RUNNING,
-                      VertexState.SUCCEEDED, VertexState.FAILED,
-                      VertexState.KILLED, VertexState.ERROR,
-                      VertexState.RECOVERING),
-                  VertexEventType.V_SOURCE_VERTEX_RECOVERED,
-                  new RecoverTransition())
-          .addTransition
-              (VertexState.RECOVERING, VertexState.RECOVERING,
-                  EnumSet.of(VertexEventType.V_INIT,
-                      VertexEventType.V_ROUTE_EVENT,
-                      VertexEventType.V_SOURCE_VERTEX_STARTED,
-                      VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED),
-                  new BufferDataRecoverTransition())
-          .addTransition
-              (VertexState.RECOVERING, VertexState.RECOVERING,
-                  VertexEventType.V_TERMINATE,
-                  new TerminateDuringRecoverTransition())
-          .addTransition
-              (VertexState.RECOVERING, EnumSet.of(VertexState.RECOVERING),
-                  VertexEventType.V_MANAGER_USER_CODE_ERROR,
-                  new VertexManagerUserCodeErrorTransition())
-          
+
           // Transitions from INITIALIZING state
           .addTransition(VertexState.INITIALIZING,
               EnumSet.of(VertexState.INITIALIZING, VertexState.INITED,
@@ -429,11 +391,6 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
               EnumSet.of(VertexState.FAILED),
               VertexEventType.V_ROOT_INPUT_FAILED,
               new RootInputInitFailedTransition())
-          .addTransition
-              (VertexState.INITED,
-                  EnumSet.of(VertexState.INITED, VertexState.ERROR),
-                  VertexEventType.V_INIT,
-                  new IgnoreInitInInitedTransition())
           .addTransition(VertexState.INITED, VertexState.INITED,
               VertexEventType.V_SOURCE_VERTEX_STARTED,
               new SourceVertexStartedTransition())
@@ -618,8 +575,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
-                  VertexEventType.V_INPUT_DATA_INFORMATION,
-                  VertexEventType.V_SOURCE_VERTEX_RECOVERED))
+                  VertexEventType.V_INPUT_DATA_INFORMATION))
 
           // Transitions from KILLED state
           .addTransition(
@@ -641,8 +597,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
-                  VertexEventType.V_INPUT_DATA_INFORMATION,
-                  VertexEventType.V_SOURCE_VERTEX_RECOVERED))
+                  VertexEventType.V_INPUT_DATA_INFORMATION))
 
           // No transitions from INTERNAL_ERROR state. Ignore all.
           .addTransition(
@@ -662,8 +617,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
                   VertexEventType.V_INTERNAL_ERROR,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
-                  VertexEventType.V_INPUT_DATA_INFORMATION,
-                  VertexEventType.V_SOURCE_VERTEX_RECOVERED))
+                  VertexEventType.V_INPUT_DATA_INFORMATION))
           // create the topology tables
           .installTopology();
 
@@ -676,7 +630,9 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         .registerStateEnteredCallback(VertexState.KILLED,
             STATE_CHANGED_CALLBACK)
         .registerStateEnteredCallback(VertexState.RUNNING,
-            STATE_CHANGED_CALLBACK);
+            STATE_CHANGED_CALLBACK)
+        .registerStateEnteredCallback(VertexState.INITIALIZING,
+            STATE_CHANGED_CALLBACK);;
   }
 
   private final StateMachineTez<VertexState, VertexEventType, VertexEvent, 
VertexImpl> stateMachine;
@@ -785,18 +741,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
   private VertexTerminationCause terminationCause;
 
   private String logIdentifier;
-  @VisibleForTesting
-  boolean recoveryCommitInProgress = false;
-  private boolean summaryCompleteSeen = false;
-  @VisibleForTesting
-  boolean hasCommitter = false;
-  private boolean vertexCompleteSeen = false;
-  private Map<String,EdgeProperty> recoveredSourceEdgeProperties = null;
-  private Map<String, InputSpecUpdate> recoveredRootInputSpecUpdates = null;
-
-  // Recovery related flags
-  boolean recoveryInitEventSeen = false;
-  boolean recoveryStartEventSeen = false;
+
   private VertexStats vertexStats = null;
 
   private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts;
@@ -945,7 +890,8 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         .createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig()
             .getEnvironmentSettingList());
     this.taskSpecificLaunchCmdOpts = taskSpecificLaunchCmdOption;
-
+    this.recoveryData = appContext.getDAGRecoveryData() == null ?
+        null : appContext.getDAGRecoveryData().getVertexRecoveryData(vertexId);
     // Set up log properties, including task specific log properties.
     String javaOptsWithoutLoggerMods =
         vertexPlan.getTaskConfig().hasJavaOpts() ? 
vertexPlan.getTaskConfig().getJavaOpts() : null;
@@ -1440,110 +1386,6 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     return this.appContext;
   }
 
-  private void handleParallelismUpdate(int newParallelism,
-      Map<String, EdgeProperty> sourceEdgeProperties,
-      Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldParallelism) {
-    // initial parallelism must have been set by this time
-    // parallelism update is recorded in history only for change from an 
initialized value
-    Preconditions.checkArgument(oldParallelism != -1, getLogIdentifier());
-    if (oldParallelism < newParallelism) {
-      addTasks(newParallelism);
-    } else if (oldParallelism > newParallelism) {
-      removeTasks(newParallelism);
-    }
-    Preconditions.checkState(this.numTasks == newParallelism, 
getLogIdentifier());
-    this.recoveredSourceEdgeProperties = sourceEdgeProperties;
-    this.recoveredRootInputSpecUpdates = rootInputSpecUpdates;
-  }
-
-  @Override
-  public VertexState restoreFromEvent(HistoryEvent historyEvent) {
-    writeLock.lock();
-    try {
-      switch (historyEvent.getEventType()) {
-        case VERTEX_INITIALIZED:
-          recoveryInitEventSeen = true;
-          recoveredState = setupVertex((VertexInitializedEvent) historyEvent);
-          createTasks();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Recovered state for vertex after Init event"
-                + ", vertex=" + logIdentifier
-                + ", recoveredState=" + recoveredState);
-          }
-          return recoveredState;
-        case VERTEX_STARTED:
-          if (!recoveryInitEventSeen) {
-            throw new RuntimeException("Started Event seen but"
-                + " no Init Event was encountered earlier");
-          }
-          recoveryStartEventSeen = true;
-          VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent;
-          startTimeRequested = startedEvent.getStartRequestedTime();
-          startedTime = startedEvent.getStartTime();
-          recoveredState = VertexState.RUNNING;
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Recovered state for vertex after Started event"
-                + ", vertex=" + logIdentifier
-                + ", recoveredState=" + recoveredState);
-          }
-          return recoveredState;
-        case VERTEX_PARALLELISM_UPDATED:
-          // TODO TEZ-1019 this should flow through setParallelism method
-          VertexParallelismUpdatedEvent updatedEvent =
-              (VertexParallelismUpdatedEvent) historyEvent;
-          int oldNumTasks = numTasks;
-          int newNumTasks = updatedEvent.getNumTasks();
-          handleParallelismUpdate(newNumTasks, 
updatedEvent.getSourceEdgeProperties(),
-            updatedEvent.getRootInputSpecUpdates(), oldNumTasks);
-          Preconditions.checkState(this.numTasks == newNumTasks, 
getLogIdentifier());
-          if (updatedEvent.getVertexLocationHint() != null) {
-            setVertexLocationHint(updatedEvent.getVertexLocationHint());
-          }
-          stateChangeNotifier.stateChanged(vertexId,
-              new VertexStateUpdateParallelismUpdated(vertexName, numTasks, 
oldNumTasks));
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Recovered state for vertex after parallelism updated 
event"
-                + ", vertex=" + logIdentifier
-                + ", recoveredState=" + recoveredState);
-          }
-          return recoveredState;
-        case VERTEX_COMMIT_STARTED:
-          recoveryCommitInProgress = true;
-          hasCommitter = true;
-          return recoveredState;
-        case VERTEX_FINISHED:
-          VertexFinishedEvent finishedEvent = (VertexFinishedEvent) 
historyEvent;
-          if (finishedEvent.isFromSummary()) {
-            summaryCompleteSeen  = true;
-          } else {
-            vertexCompleteSeen = true;
-          }
-          numTasks = finishedEvent.getNumTasks();
-          recoveryCommitInProgress = false;
-          recoveredState = finishedEvent.getState();
-          diagnostics.add(finishedEvent.getDiagnostics());
-          finishTime = finishedEvent.getFinishTime();
-          // TODO counters ??
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Recovered state for vertex after finished event"
-                + ", vertex=" + logIdentifier
-                + ", recoveredState=" + recoveredState);
-          }
-          return recoveredState;
-        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-          VertexRecoverableEventsGeneratedEvent vEvent =
-              (VertexRecoverableEventsGeneratedEvent) historyEvent;
-          this.recoveredEvents.addAll(vEvent.getTezEvents());
-          return recoveredState;
-        default:
-          throw new RuntimeException("Unexpected event received for restoring"
-              + " state, eventType=" + historyEvent.getEventType());
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
   @Override
   public String getLogIdentifier() {
     return this.logIdentifier;
@@ -1613,7 +1455,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         if (!pendingTaskEvents.isEmpty()) {
           LOG.info("Routing pending task events for vertex: " + logIdentifier);
           try {
-            handleRoutedTezEvents(pendingTaskEvents, false, true);
+            handleRoutedTezEvents(pendingTaskEvents, true);
           } catch (AMUserCodeException e) {
             String msg = "Exception in " + e.getSource() + ", vertex=" + 
logIdentifier;
             LOG.error(msg, e);
@@ -1667,8 +1509,9 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         for (ScheduleTaskRequest task : tasksToSchedule) {
           TezTaskID taskId = TezTaskID.getInstance(vertexId, 
task.getTaskIndex());
           TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId());
+          boolean fromRecovery = recoveryData == null ? false : 
recoveryData.getTaskRecoveryData(taskId) != null;
           eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec,
-              getTaskLocationHint(taskId)));
+              getTaskLocationHint(taskId), fromRecovery));
         }
       } finally {
         readLock.unlock();
@@ -1687,22 +1530,22 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
   public void reconfigureVertex(int parallelism,
       @Nullable VertexLocationHint locationHint,
       @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws 
AMUserCodeException {
-    setParallelism(parallelism, locationHint, sourceEdgeProperties, null, 
false, true);
+    setParallelismWrapper(parallelism, locationHint, sourceEdgeProperties, 
null, true);
   }
   
   @Override
   public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> 
rootInputSpecUpdate,
       int parallelism,
       @Nullable VertexLocationHint locationHint) throws AMUserCodeException {
-    setParallelism(parallelism, locationHint, null, rootInputSpecUpdate, 
false, true);
+    setParallelism(parallelism, locationHint, null, rootInputSpecUpdate, true);
   }
-  
+
   @Override
   public void reconfigureVertex(int parallelism,
       @Nullable VertexLocationHint locationHint,
       @Nullable Map<String, EdgeProperty> sourceEdgeProperties,
       @Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate) throws 
AMUserCodeException {
-    setParallelism(parallelism, locationHint, sourceEdgeProperties, 
rootInputSpecUpdate, false, true);
+    setParallelismWrapper(parallelism, locationHint, sourceEdgeProperties, 
rootInputSpecUpdate, true);
   }
   
   @Override
@@ -1730,49 +1573,18 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     } finally {
       readLock.unlock();
     }
-    setParallelism(parallelism, vertexLocationHint, sourceEdgeProperties, 
rootInputSpecUpdates,
-        false, fromVertexManager);
+    setParallelismWrapper(parallelism, vertexLocationHint, 
sourceEdgeProperties, rootInputSpecUpdates,
+        fromVertexManager);
   }
 
-  private void setParallelism(int parallelism, VertexLocationHint 
vertexLocationHint,
+  private void setParallelismWrapper(int parallelism, VertexLocationHint 
vertexLocationHint,
       Map<String, EdgeProperty> sourceEdgeProperties,
       Map<String, InputSpecUpdate> rootInputSpecUpdates,
-      boolean recovering, boolean fromVertexManager) throws 
AMUserCodeException {
-    if (recovering) {
-      writeLock.lock();
-      try {
-        if (sourceEdgeProperties != null) {
-          for(Map.Entry<String, EdgeProperty> entry :
-            sourceEdgeProperties.entrySet()) {
-            LOG.info("Recovering edge manager for source:"
-                + entry.getKey() + " destination: " + getLogIdentifier());
-            Vertex sourceVertex = 
appContext.getCurrentDAG().getVertex(entry.getKey());
-            Edge edge = sourceVertices.get(sourceVertex);
-            try {
-              edge.setEdgeProperty(entry.getValue());
-            } catch (Exception e) {
-              throw new TezUncheckedException("Fail to setCustomEdgeManage for 
Edge,"
-                  + "sourceVertex:" + edge.getSourceVertexName()
-                  + "destinationVertex:" + edge.getDestinationVertexName(), e);
-            }
-          }
-        }
-
-        // Restore any rootInputSpecUpdates which may have been registered 
during a parallelism
-        // update.
-        if (rootInputSpecUpdates != null) {
-          LOG.info("Got updated RootInputsSpecs during recovery: " + 
rootInputSpecUpdates.toString());
-          this.rootInputSpecs.putAll(rootInputSpecUpdates);
-        }
-        return;
-      } finally {
-        writeLock.unlock();
-      }
-    }
+      boolean fromVertexManager) throws AMUserCodeException {
     Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. 
Value: " + parallelism
         + " for vertex: " + logIdentifier);
     writeLock.lock();
-
+    this.setParallelismCalledFlag = true;
     try {
       // disallow changing things after a vertex has started
       if (!tasksNotYetScheduled) {
@@ -1791,7 +1603,8 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
                 vertexToBeReconfiguredByManager,
                 "Vertex is fully configured but still"
                     + " the reconfiguration API has been called. VertexManager 
must notify the framework using "
-                    + " context.vertexReconfigurationPlanned() before 
re-configuring the vertex.");
+                    + " context.vertexReconfigurationPlanned() before 
re-configuring the vertex."
+                    + " vertexId=" + logIdentifier);
       }
       
       // Input initializer/Vertex Manager/1-1 split expected to set 
parallelism.
@@ -1875,7 +1688,6 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
             removeTasks(parallelism);
           }
         }
-
         Preconditions.checkState(this.numTasks == parallelism, 
getLogIdentifier());
         
         // set new vertex location hints
@@ -1903,13 +1715,6 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
           }
         }
 
-        // update history
-        VertexParallelismUpdatedEvent parallelismUpdatedEvent = new 
VertexParallelismUpdatedEvent(
-            vertexId, numTasks, vertexLocationHint, sourceEdgeProperties, 
rootInputSpecUpdates,
-            oldNumTasks);
-        appContext.getHistoryHandler().handle(
-            new DAGHistoryEvent(getDAGId(), parallelismUpdatedEvent));
-
         // stop buffering events
         for (Edge edge : sourceVertices.values()) {
           edge.stopEventBuffering();
@@ -1961,6 +1766,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         Preconditions.checkState(getInternalState() == 
VertexState.INITIALIZING, "Vertex: "
             + getLogIdentifier());
       }
+      
     } finally {
       writeLock.unlock();
     }    
@@ -2034,28 +1840,52 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
 
 
   void logJobHistoryVertexInitializedEvent() {
+    // TODO Vertex init may happen multiple times, so it is possible to have 
multiple VertexInitializedEvent
     VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, 
vertexName,
         initTimeRequested, initedTime, numTasks,
-        getProcessorName(), getAdditionalInputs());
+        getProcessorName(), getAdditionalInputs(), initGeneratedEvents);
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGId(), initEvt));
   }
 
   void logJobHistoryVertexStartedEvent() {
-    VertexStartedEvent startEvt = new VertexStartedEvent(vertexId,
-        startTimeRequested, startedTime);
-    this.appContext.getHistoryHandler().handle(
-        new DAGHistoryEvent(getDAGId(), startEvt));
+    if (recoveryData == null
+        || !recoveryData.isVertexStarted()) {
+      VertexStartedEvent startEvt = new VertexStartedEvent(vertexId,
+          startTimeRequested, startedTime);
+      this.appContext.getHistoryHandler().handle(
+          new DAGHistoryEvent(getDAGId(), startEvt));
+    }
+  }
+
+  void logVertexConfigurationDoneEvent() {
+    if (recoveryData == null || !recoveryData.shouldSkipInit()) {
+      Map<String, EdgeProperty> sourceEdgeProperties = new HashMap<String, 
EdgeProperty>();
+      for (Map.Entry<Vertex, Edge> entry : this.sourceVertices.entrySet()) {
+        sourceEdgeProperties.put(entry.getKey().getName(), 
entry.getValue().getEdgeProperty());
+      }
+      VertexConfigurationDoneEvent reconfigureDoneEvent =
+          new VertexConfigurationDoneEvent(vertexId, clock.getTime(),
+              numTasks, taskLocationHints == null ? null : 
VertexLocationHint.create(Lists.newArrayList(taskLocationHints)),
+                  sourceEdgeProperties, rootInputSpecs, 
setParallelismCalledFlag);
+      this.appContext.getHistoryHandler().handle(
+          new DAGHistoryEvent(getDAGId(), reconfigureDoneEvent));
+    }
   }
 
   void logJobHistoryVertexFinishedEvent() throws IOException {
-    this.setFinishTime();
-    logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, "");
+    if (recoveryData == null
+        || !recoveryData.isVertexSucceeded()) {
+      logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, 
"");
+    }
   }
 
   void logJobHistoryVertexFailedEvent(VertexState state) throws IOException {
-    logJobHistoryVertexCompletedHelper(state, clock.getTime(),
-        StringUtils.join(getDiagnostics(), LINE_SEPARATOR));
+    if (recoveryData == null
+        || !recoveryData.isVertexFinished()) {
+      logJobHistoryVertexCompletedHelper(state, clock.getTime(),
+          StringUtils.join(getDiagnostics(), LINE_SEPARATOR));
+    }
   }
 
   private void logJobHistoryVertexCompletedHelper(VertexState finalState, long 
finishTime,
@@ -2079,6 +1909,12 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     // commit only once. Dont commit shared outputs
     if (vertex.outputCommitters != null
         && !vertex.outputCommitters.isEmpty()) {
+      if (vertex.recoveryData != null
+          && vertex.recoveryData.isVertexCommitted()) {
+        LOG.info("Vertex was already committed as per recovery"
+            + " data, vertex=" + vertex.logIdentifier);
+        return vertex.finished(VertexState.SUCCEEDED);
+      }
       boolean firstCommit = true;
       for (Entry<String, OutputCommitter> entry : 
vertex.outputCommitters.entrySet()) {
         final OutputCommitter committer = entry.getValue();
@@ -2354,19 +2190,31 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
   }
 
   private boolean initializeVertex() {
-    try {
-      initializeCommitters();
-    } catch (Exception e) {
-      LOG.warn("Vertex Committer init failed, vertex=" + logIdentifier, e);
-      addDiagnostic("Vertex init failed : "
-          + ExceptionUtils.getStackTrace(e));
-      trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
-      finished(VertexState.FAILED);
-      return false;
+    // Don't need to initialize committer if vertex is fully completed
+    if (recoveryData != null && recoveryData.shouldSkipInit()) {
+      // Do other necessary recovery here
+      initedTime = recoveryData.getVertexInitedEvent().getInitedTime();
+      List<TezEvent> initGeneratedEvents = 
recoveryData.getVertexInitedEvent().getInitGeneratedEvents();
+      if (initGeneratedEvents != null && !initGeneratedEvents.isEmpty()) {
+        eventHandler.handle(new VertexEventRouteEvent(getVertexId(), 
initGeneratedEvents));
+      }
+    } else {
+      initedTime = clock.getTime();
+    }
+    // Only initialize committer when it is in non-recovery mode or vertex is 
not recovered to completed 
+    // state in recovery mode
+    if (recoveryData == null || recoveryData.getVertexFinishedEvent() == null) 
{
+      try {
+        initializeCommitters();
+      } catch (Exception e) {
+        LOG.warn("Vertex Committer init failed, vertex=" + logIdentifier, e);
+        addDiagnostic("Vertex init failed : "
+            + ExceptionUtils.getStackTrace(e));
+        trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
+        finished(VertexState.FAILED);
+        return false;
+      }
     }
-
-    // TODO: Metrics
-    initedTime = clock.getTime();
 
     logJobHistoryVertexInitializedEvent();
     return true;
@@ -2470,23 +2318,14 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
   }
 
-  private VertexState setupVertex() {
-    return setupVertex(null);
-  }
 
-  private VertexState setupVertex(VertexInitializedEvent event) {
+  private VertexState setupVertex() {
 
-    if (event == null) {
-      initTimeRequested = clock.getTime();
-    } else {
-      initTimeRequested = event.getInitRequestedTime();
-      initedTime = event.getInitedTime();
-    }
+    this.initTimeRequested = clock.getTime();
 
     // VertexManager needs to be setup before attempting to Initialize any
     // Inputs - since events generated by them will be routed to the
     // VertexManager for handling.
-
     if (dagVertexGroups != null && !dagVertexGroups.isEmpty()) {
       List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
       for (VertexGroupInfo groupInfo : dagVertexGroups.values()) {
@@ -2502,24 +2341,20 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
 
     // Check if any inputs need initializers
-    if (event != null) {
-      this.rootInputDescriptors = event.getAdditionalInputs();
-    } else {
-      if (rootInputDescriptors != null) {
-        LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
-            + rootInputDescriptors);
-        for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> 
input
-            : rootInputDescriptors.values()) {
-          if (input.getControllerDescriptor() != null &&
-              input.getControllerDescriptor().getClassName() != null) {
-            if (inputsWithInitializers == null) {
-              inputsWithInitializers = Sets.newHashSet();
-            }
-            inputsWithInitializers.add(input.getName());
-            LOG.info("Starting root input initializer for input: "
-                + input.getName() + ", with class: ["
-                + input.getControllerDescriptor().getClassName() + "]");
+    if (rootInputDescriptors != null) {
+      LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
+          + rootInputDescriptors);
+      for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> 
input
+          : rootInputDescriptors.values()) {
+        if (input.getControllerDescriptor() != null &&
+            input.getControllerDescriptor().getClassName() != null) {
+          if (inputsWithInitializers == null) {
+            inputsWithInitializers = Sets.newHashSet();
           }
+          inputsWithInitializers.add(input.getName());
+          LOG.info("Starting root input initializer for input: "
+              + input.getName() + ", with class: ["
+              + input.getControllerDescriptor().getClassName() + "]");
         }
       }
     }
@@ -2536,13 +2371,21 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
 
     if (hasBipartite && inputsWithInitializers != null) {
       LOG.error("A vertex with an Initial Input and a Shuffle Input are not 
supported at the moment");
-      if (event != null) {
-        return VertexState.FAILED;
-      } else {
-        return finished(VertexState.FAILED);
-      }
+      return finished(VertexState.FAILED);
+    }
+
+    numTasks = getVertexPlan().getTaskConfig().getNumTasks();
+    if (!(numTasks == -1 || numTasks >= 0)) {
+      addDiagnostic("Invalid task count for vertex"
+          + ", numTasks=" + numTasks);
+      trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
+      return VertexState.FAILED;
     }
 
+    checkTaskLimits();
+    // set VertexManager as the last step. Because in recovery case, we may 
need to restore 
+    // some info from last the AM attempt and skip the initialization step. 
Otherwise numTasks may be
+    // reset to -1 after the restore.
     try {
       assignVertexManager();
     } catch (TezException e1) {
@@ -2571,38 +2414,43 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
           msg + ", " + e.getMessage() + ", " + 
ExceptionUtils.getStackTrace(e.getCause()));
       return VertexState.FAILED;
     }
-
-    // Setup tasks early if possible. If the VertexManager is not being used
-    // to set parallelism, sending events to Tasks is safe (and less confusing
-    // then relying on tasks to be created after TaskEvents are generated).
-    // For VertexManagers setting parallelism, the setParallelism call needs
-    // to be inline.
-    if (event != null) {
-      int oldNumTasks = numTasks;
-      numTasks = event.getNumTasks();
-      stateChangeNotifier.stateChanged(vertexId,
-          new VertexStateUpdateParallelismUpdated(vertexName, numTasks, 
oldNumTasks));
-    } else {
-      numTasks = getVertexPlan().getTaskConfig().getNumTasks();
-      // Not sending a parallelism update notification since this is from the 
original plan
-    }
-
-    if (!(numTasks == -1 || numTasks >= 0)) {
-      addDiagnostic("Invalid task count for vertex"
-          + ", numTasks=" + numTasks);
-      trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
-      if (event != null) {
-        return finished(VertexState.FAILED);
-      } else {
-        return VertexState.FAILED;
-      }
-    }
-    
-    checkTaskLimits();
     return VertexState.INITED;
   }
 
   private void assignVertexManager() throws TezException {
+    // condition for skip initializing stage
+    //   - VertexInputInitializerEvent is seen
+    //   - VertexReconfigureDoneEvent is seen
+    //        -  Reason to check whether VertexManager has complete its 
responsibility
+    //           VertexManager actually is involved in the InputInitializer 
(InputInitializer generate events
+    //           and send them to VertexManager which do some processing and 
send back to Vertex), so that means
+    //           Input initializer will affect on the VertexManager and we 
couldn't skip the initializing step if  
+    //           VertexManager has not completed its responsibility.
+    //        -  Why using VertexReconfigureDoneEvent
+    //           -  VertexReconfigureDoneEvent represent the case that user 
use API reconfigureVertex
+    //              VertexReconfigureDoneEvent will be logged
+    if (recoveryData != null
+        && recoveryData.shouldSkipInit()) {
+      // Replace the original VertexManager with NoOpVertexManager if the 
reconfiguration is done in the last AM attempt
+      VertexConfigurationDoneEvent reconfigureDoneEvent = 
recoveryData.getVertexConfigurationDoneEvent();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("VertexManager reconfiguration is done in the last AM 
Attempt"
+            + ", use NoOpVertexManager to replace it, vertexId=" + 
logIdentifier);
+        LOG.debug("VertexReconfigureDoneEvent=" + reconfigureDoneEvent);
+      }
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      try {
+        reconfigureDoneEvent.toProtoStream(out);
+      } catch (IOException e) {
+        throw new TezUncheckedException("Unable to deserilize 
VertexReconfigureDoneEvent");
+      }
+      this.vertexManager = new VertexManager(
+          
VertexManagerPluginDescriptor.create(NoOpVertexManager.class.getName())
+            
.setUserPayload(UserPayload.create(ByteBuffer.wrap(out.toByteArray()))),
+          dagUgi, this, appContext, stateChangeNotifier);
+      return;
+    }
+
     boolean hasBipartite = false;
     boolean hasOneToOne = false;
     boolean hasCustom = false;
@@ -2671,258 +2519,6 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
       }
     }
   }
-
-  public static class StartRecoverTransition implements
-      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
-
-    @Override
-    public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
-      VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) 
vertexEvent;
-      VertexState desiredState = recoverEvent.getDesiredState();
-
-      switch (desiredState) {
-        case RUNNING:
-          break;
-        case SUCCEEDED:
-        case KILLED:
-        case FAILED:
-        case ERROR:
-          if (desiredState == VertexState.SUCCEEDED) {
-            vertex.succeededTaskCount = vertex.numTasks;
-            vertex.completedTaskCount = vertex.numTasks;
-          } else if (desiredState == VertexState.KILLED) {
-            vertex.killedTaskCount = vertex.numTasks;
-          } else if (desiredState == VertexState.FAILED || desiredState == 
VertexState.ERROR) {
-            vertex.failedTaskCount = vertex.numTasks;
-          }
-          if (vertex.tasks != null) {
-            TaskState taskState = TaskState.KILLED;
-            if (desiredState == VertexState.SUCCEEDED) {
-              taskState = TaskState.SUCCEEDED;
-            } else if (desiredState == VertexState.KILLED) {
-              taskState = TaskState.KILLED;
-            } else if (desiredState == VertexState.FAILED || desiredState == 
VertexState.ERROR) {
-              taskState = TaskState.FAILED;
-            }
-            for (Task task : vertex.tasks.values()) {
-              vertex.eventHandler.handle(
-                  new TaskEventRecoverTask(task.getTaskId(),
-                      taskState, false));
-            }
-          }
-          LOG.info("DAG informed Vertex of its final completed state"
-              + ", vertex=" + vertex.logIdentifier
-              + ", state=" + desiredState);
-          return desiredState;
-        default:
-          LOG.info("Unhandled desired state provided by DAG"
-              + ", vertex=" + vertex.logIdentifier
-              + ", state=" + desiredState);
-          vertex.finished(VertexState.ERROR);
-      }
-
-      // recover from recover log, should recover to running
-      // desiredState must be RUNNING based on above code
-      VertexState endState;
-      switch (vertex.recoveredState) {
-        case NEW:
-          // Trigger init and start as desired state is RUNNING
-          // Drop all root events
-          Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
-          while (iterator.hasNext()) {
-            if (iterator.next().getEventType().equals(
-                EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
-              iterator.remove();
-            }
-          }
-          vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
-              VertexEventType.V_INIT));
-          vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
-              VertexEventType.V_START));
-          endState = VertexState.NEW;
-          break;
-        case INITED:
-          try {
-            vertex.initializeCommitters();
-          } catch (Exception e) {
-            String msg = "Failed to initialize committers"
-                + ", vertex=" + vertex.logIdentifier + ","
-                + ExceptionUtils.getStackTrace(e);
-            LOG.error(msg);
-            vertex.finished(VertexState.FAILED,
-                VertexTerminationCause.INIT_FAILURE, msg);
-            endState = VertexState.FAILED;
-            break;
-          }
-
-          // Recover tasks
-          if (vertex.tasks != null) {
-            for (Task task : vertex.tasks.values()) {
-              vertex.eventHandler.handle(
-                  new TaskEventRecoverTask(task.getTaskId()));
-            }
-          }
-          // Update tasks with their input payloads as needed
-
-          vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
-              VertexEventType.V_START));
-          if (vertex.getInputVertices().isEmpty()) {
-            endState = VertexState.INITED;
-          } else {
-            endState = VertexState.RECOVERING;
-          }
-          break;
-        case RUNNING:
-          try {
-            vertex.initializeCommitters();
-          } catch (Exception e) {
-            String msg = "Failed to initialize committers"
-                + ", vertex=" + vertex.logIdentifier + ","
-                + ExceptionUtils.getStackTrace(e);
-            LOG.error(msg);
-            vertex.finished(VertexState.FAILED,
-                VertexTerminationCause.INIT_FAILURE, msg);
-            endState = VertexState.FAILED;
-            break;
-          }
-
-          // if commit in progress and desired state is not a succeeded one,
-          // move to failed
-          if (vertex.recoveryCommitInProgress) {
-            String msg = "Recovered vertex was in the middle of a commit"
-                + ", failing Vertex=" + vertex.logIdentifier;
-            LOG.warn(msg);
-            vertex.finished(VertexState.FAILED,
-                VertexTerminationCause.COMMIT_FAILURE, msg);
-            endState = VertexState.FAILED;
-            break;
-          }
-          assert vertex.tasks.size() == vertex.numTasks;
-          if (vertex.tasks != null && vertex.numTasks != 0) {
-            for (Task task : vertex.tasks.values()) {
-              vertex.eventHandler.handle(
-                  new TaskEventRecoverTask(task.getTaskId()));
-            }
-            try {
-              vertex.recoveryCodeSimulatingStart();
-              vertex.unsetTasksNotYetScheduled();
-              endState = VertexState.RUNNING;
-            } catch (AMUserCodeException e) {
-              String msg = "Exception in " + e.getSource() + ", vertex:" + 
vertex.getLogIdentifier();
-              LOG.error(msg, e);
-              vertex.finished(VertexState.FAILED, 
VertexTerminationCause.AM_USERCODE_FAILURE,
-                  msg + ", " + ExceptionUtils.getStackTrace(e.getCause()));
-              endState = VertexState.FAILED;
-            }
-          } else {
-            // why succeeded here
-            endState = VertexState.SUCCEEDED;
-            vertex.finished(endState);
-          }
-          break;
-        case SUCCEEDED:
-          if (vertex.hasCommitter
-              && vertex.summaryCompleteSeen && !vertex.vertexCompleteSeen) {
-            String msg = "Cannot recover vertex as all recovery events not"
-                + " found, vertex=" + vertex.logIdentifier
-                + ", hasCommitters=" + vertex.hasCommitter
-                + ", summaryCompletionSeen=" + vertex.summaryCompleteSeen
-                + ", finalCompletionSeen=" + vertex.vertexCompleteSeen;
-            LOG.warn(msg);
-            vertex.finished(VertexState.FAILED,
-                VertexTerminationCause.COMMIT_FAILURE, msg);
-            endState = VertexState.FAILED;
-          } else {
-            // recover tasks
-            if (vertex.tasks != null && vertex.numTasks != 0) {
-              TaskState taskState = TaskState.SUCCEEDED;
-              for (Task task : vertex.tasks.values()) {
-                vertex.eventHandler.handle(
-                    new TaskEventRecoverTask(task.getTaskId(),
-                        taskState));
-              }
-              try {
-                vertex.recoveryCodeSimulatingStart();
-                vertex.unsetTasksNotYetScheduled();
-                endState = VertexState.RUNNING;
-              } catch (AMUserCodeException e) {
-                String msg = "Exception in " + e.getSource() +", vertex:" + 
vertex.getLogIdentifier();
-                LOG.error(msg, e);
-                vertex.finished(VertexState.FAILED, 
VertexTerminationCause.AM_USERCODE_FAILURE,
-                    msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
-                endState = VertexState.FAILED;
-              }
-            } else {
-              endState = vertex.recoveredState;
-              vertex.finished(endState);
-            }
-          }
-          break;
-        case FAILED:
-        case KILLED:
-          // vertex may be killed/failed before its tasks are scheduled. so 
here just recover vertex
-          // to the recovered state without waiting for its tasks' feedback 
and recover tasks to
-          // the corresponding state without recover its data.
-          if (vertex.tasks != null && vertex.numTasks != 0) {
-            TaskState taskState = TaskState.FAILED;
-            if (vertex.recoveredState == VertexState.KILLED) {
-              taskState = TaskState.KILLED;
-            }
-            for (Task task : vertex.tasks.values()) {
-              vertex.eventHandler.handle(
-                  new TaskEventRecoverTask(task.getTaskId(),
-                      taskState, false));
-            }
-          }
-          endState = vertex.recoveredState;
-          vertex.finished(endState);
-          break;
-        default:
-          LOG.warn("Invalid recoveredState found when trying to recover"
-              + " vertex"
-              + ", vertex=" + vertex.logIdentifier
-              + ", recoveredState=" + vertex.recoveredState);
-          vertex.finished(VertexState.ERROR);
-          endState = VertexState.ERROR;
-          break;
-      }
-      if (!endState.equals(VertexState.RECOVERING)) {
-        LOG.info("Recovered Vertex State"
-            + ", vertexId=" + vertex.logIdentifier
-            + ", state=" + endState
-            + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices
-            + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices
-            + ", numRecoveredSourceVertices=" + 
vertex.numRecoveredSourceVertices
-            + ", recoveredEvents="
-            + ( vertex.recoveredEvents == null ? "null" : 
vertex.recoveredEvents.size())
-            + ", tasksIsNull=" + (vertex.tasks == null)
-            + ", numTasks=" + ( vertex.tasks == null ? "null" : 
vertex.tasks.size()));
-        for (Entry<Vertex, Edge> entry : 
vertex.getOutputVertices().entrySet()) {
-          vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
-              entry.getKey().getVertexId(),
-              vertex.vertexId, endState, null,
-              vertex.getDistanceFromRoot()));
-        }
-      }
-      if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, 
VertexState.INITED)
-          .contains(endState)) {
-        // Send events downstream
-        vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
-        vertex.recoveredEvents.clear();
-      } else {
-        // Ensure no recovered events
-        if (!vertex.recoveredEvents.isEmpty()) {
-          throw new RuntimeException("Invalid Vertex state"
-              + ", found non-zero recovered events in invalid state"
-              + ", vertex=" + vertex.logIdentifier
-              + ", recoveredState=" + endState
-              + ", recoveredEvents=" + vertex.recoveredEvents.size());
-        }
-      }
-      return endState;
-    }
-
-  }
   
   private static List<TaskAttemptIdentifier> getTaskAttemptIdentifiers(DAG 
dag, 
       List<TezTaskAttemptID> taIds) {
@@ -2939,73 +2535,6 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
       TezTaskAttemptID taId) {
     return new TaskAttemptIdentifierImpl(dagName, vertexName, taId);
   }
-  
-  private void recoveryCodeSimulatingStart() throws AMUserCodeException {
-    vertexManager.onVertexStarted(getTaskAttemptIdentifiers(dag, 
pendingReportedSrcCompletions));
-    // This code is duplicated from startVertex() because recovery does not 
follow normal
-    // transitions. To be removed after recovery code is fixed.
-    maybeSendConfiguredEvent();
-  }
-
-  private void routeRecoveredEvents(VertexState vertexState,
-      List<TezEvent> tezEvents) {
-    for (TezEvent tezEvent : tezEvents) {
-      EventMetaData sourceMeta = tezEvent.getSourceInfo();
-      TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
-      if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
-        ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
-      } else if (tezEvent.getEventType() == 
EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
-        ((CompositeDataMovementEvent) 
tezEvent.getEvent()).setVersion(srcTaId.getId());
-      } else if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) {
-        ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
-      } else if (tezEvent.getEventType() == 
EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
-        if (vertexState == VertexState.RUNNING
-            || vertexState == VertexState.INITED) {
-          // Only routed if vertex is still running
-          eventHandler.handle(new VertexEventRouteEvent(
-              this.getVertexId(), Collections.singletonList(tezEvent), true));
-        }
-        continue;
-      } else if (tezEvent.getEventType() == 
EventType.ROOT_INPUT_INITIALIZER_EVENT) {
-        // The event has the relevant target information
-        InputInitializerEvent iiEvent = (InputInitializerEvent) 
tezEvent.getEvent();
-        iiEvent.setSourceVertexName(vertexName);
-        eventHandler.handle(new VertexEventRouteEvent(
-            getDAG().getVertex(iiEvent.getTargetVertexName()).getVertexId(),
-            Collections.singletonList(tezEvent), true));
-        continue;
-      }
-
-      Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName());
-      Edge destEdge = targetVertices.get(destVertex);
-      if (destEdge == null) {
-        throw new TezUncheckedException("Bad destination vertex: " +
-            sourceMeta.getEdgeVertexName() + " for event vertex: " +
-            getLogIdentifier());
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Routing recovered event"
-            + ", vertex=" + logIdentifier
-            + ", eventType=" + tezEvent.getEventType()
-            + ", sourceInfo=" + sourceMeta
-            + ", destinationVertex=" + destVertex.getLogIdentifier());
-      }
-      eventHandler.handle(new VertexEventRouteEvent(destVertex
-          .getVertexId(), Collections.singletonList(tezEvent), true));
-    }
-  }
-
-  public static class TerminateDuringRecoverTransition implements
-      SingleArcTransition<VertexImpl, VertexEvent> {
-
-    @Override
-    public void transition(VertexImpl vertex, VertexEvent vertexEvent) {
-      LOG.info("Received a terminate during recovering, setting recovered"
-          + " state to KILLED");
-      vertex.recoveredState = VertexState.KILLED;
-    }
-
-  }
 
   public static class NullEdgeInitializedTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -3036,375 +2565,64 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
   }
 
-  public static class BufferDataRecoverTransition implements
-      SingleArcTransition<VertexImpl, VertexEvent> {
-
-    @Override
-    public void transition(VertexImpl vertex, VertexEvent vertexEvent) {
-      LOG.info("Received upstream event while still recovering"
-          + ", vertexId=" + vertex.logIdentifier
-          + ", vertexEventType=" + vertexEvent.getType());
-      if (vertexEvent.getType().equals(VertexEventType.V_ROUTE_EVENT)) {
-        VertexEventRouteEvent evt = (VertexEventRouteEvent) vertexEvent;
-        vertex.pendingRouteEvents.addAll(evt.getEvents());
-      } else if (vertexEvent.getType().equals(
-          VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED)) {
-        VertexEventSourceTaskAttemptCompleted evt =
-            (VertexEventSourceTaskAttemptCompleted) vertexEvent;
-        vertex.pendingReportedSrcCompletions.add(
-            evt.getCompletionEvent().getTaskAttemptId());
-      } else if (vertexEvent.getType().equals(
-          VertexEventType.V_SOURCE_VERTEX_STARTED)) {
-        VertexEventSourceVertexStarted startEvent =
-            (VertexEventSourceVertexStarted) vertexEvent;
-        int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1;
-        if(vertex.distanceFromRoot < distanceFromRoot) {
-          vertex.distanceFromRoot = distanceFromRoot;
-        }
-        ++vertex.numStartedSourceVertices;
-      } else if (vertexEvent.getType().equals(VertexEventType.V_INIT)) {
-        ++vertex.numInitedSourceVertices;
-      }
-    }
-  }
-
-
   public static class RecoverTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
-      VertexEventSourceVertexRecovered sourceRecoveredEvent =
-          (VertexEventSourceVertexRecovered) vertexEvent;
-      // Use distance from root from Recovery events as upstream vertices may 
not
-      // send source vertex started event that is used to compute distance
-      int distanceFromRoot = sourceRecoveredEvent.getSourceDistanceFromRoot() 
+ 1;
-      if(vertex.distanceFromRoot < distanceFromRoot) {
-        vertex.distanceFromRoot = distanceFromRoot;
-      }
-
-      ++vertex.numRecoveredSourceVertices;
-
-      switch (sourceRecoveredEvent.getSourceVertexState()) {
-        case NEW:
-          // Nothing to do
-          break;
-        case INITED:
-          ++vertex.numInitedSourceVertices;
-          break;
-        case RUNNING:
-        case SUCCEEDED:
-          ++vertex.numInitedSourceVertices;
-          ++vertex.numStartedSourceVertices;
-          if (sourceRecoveredEvent.getCompletedTaskAttempts() != null) {
-            vertex.pendingReportedSrcCompletions.addAll(
-                sourceRecoveredEvent.getCompletedTaskAttempts());
-          }
-          break;
-        case FAILED:
-        case KILLED:
-        case ERROR:
-          // Nothing to do
-          // Recover as if source vertices have not inited/started
-          break;
-        default:
-          LOG.warn("Received invalid SourceVertexRecovered event"
-              + ", vertex=" + vertex.logIdentifier
-              + ", sourceVertex=" + sourceRecoveredEvent.getSourceVertexID()
-              + ", sourceVertexState=" + 
sourceRecoveredEvent.getSourceVertexState());
-          return vertex.finished(VertexState.ERROR);
-      }
-
-      if (vertex.numRecoveredSourceVertices !=
-          vertex.getInputVerticesCount()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Waiting for source vertices to recover"
-              + ", vertex=" + vertex.logIdentifier
-              + ", numRecoveredSourceVertices=" + 
vertex.numRecoveredSourceVertices
-              + ", totalSourceVertices=" + vertex.getInputVerticesCount());
-        }
-        return VertexState.RECOVERING;
-      }
-
-
-      // Complete recovery
-      VertexState endState = VertexState.NEW;
-      List<TezTaskAttemptID> completedTaskAttempts = Lists.newLinkedList();
-      switch (vertex.recoveredState) {
-        case NEW:
-          // Drop all root events if not inited properly
-          Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
-          while (iterator.hasNext()) {
-            if (iterator.next().getEventType().equals(
-                EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
-              iterator.remove();
-            }
-          }
-          // Trigger init if all sources initialized
-          if (vertex.numInitedSourceVertices == 
vertex.getInputVerticesCount()) {
-            vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
-                VertexEventType.V_INIT));
-          }
-          if (vertex.numStartedSourceVertices == 
vertex.getInputVerticesCount()) {
-            vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
-                VertexEventType.V_START));
-          }
-          endState = VertexState.NEW;
-          break;
-        case INITED:
-          vertex.vertexAlreadyInitialized = true;
-          try {
-            vertex.initializeCommitters();
-          } catch (Exception e) {
-            String msg = "Failed to initialize committers, vertex="
-                + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace(e);
-            LOG.error(msg);
-            vertex.finished(VertexState.FAILED,
-                VertexTerminationCause.INIT_FAILURE, msg);
-            endState = VertexState.FAILED;
-            break;
-          }
-          boolean successSetParallelism ;
-          try {
-            // recovering only edge manager
-            vertex.setParallelism(0,
-              null, vertex.recoveredSourceEdgeProperties, 
vertex.recoveredRootInputSpecUpdates, true, false);
-            successSetParallelism = true;
-          } catch (Exception e) {
-            successSetParallelism = false;
-          }
-          if (!successSetParallelism) {
-            String msg  = "Failed to recover edge managers, vertex=" + 
vertex.logIdentifier;
-            LOG.error(msg);
-            vertex.finished(VertexState.FAILED,
-                VertexTerminationCause.INIT_FAILURE, msg);
-            endState = VertexState.FAILED;
-            break;
-          }
-          // Recover tasks
-          if (vertex.tasks != null) {
-            for (Task task : vertex.tasks.values()) {
-              vertex.eventHandler.handle(
-                  new TaskEventRecoverTask(task.getTaskId()));
-            }
-          }
-          if (vertex.numInitedSourceVertices != 
vertex.getInputVerticesCount()) {
-            LOG.info("Vertex already initialized but source vertices have not"
-                + " initialized"
-                + ", vertexId=" + vertex.logIdentifier
-                + ", numInitedSourceVertices=" + 
vertex.numInitedSourceVertices);
-          } else {
-            if (vertex.numStartedSourceVertices == 
vertex.getInputVerticesCount()) {
-              vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
-                VertexEventType.V_START));
-            }
-          }
-          endState = VertexState.INITED;
-          break;
-        case RUNNING:
-          // if commit in progress and desired state is not a succeeded one,
-          // move to failed
-          if (vertex.recoveryCommitInProgress) {
-            LOG.info("Recovered vertex was in the middle of a commit"
-                + ", failing Vertex=" + vertex.logIdentifier);
-            vertex.finished(VertexState.FAILED,
-                VertexTerminationCause.COMMIT_FAILURE, null);
-            endState = VertexState.FAILED;
-            break;
-          }
-          try {
-            vertex.initializeCommitters();
-          } catch (Exception e) {
-            String msg = "Failed to initialize committers, vertex="
-                + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace(e);
-            LOG.error(msg);
-            vertex.finished(VertexState.FAILED,
-                VertexTerminationCause.INIT_FAILURE, msg);
-            endState = VertexState.FAILED;
-            break;
-          }
-          try {
-            vertex.setParallelism(vertex.numTasks, null, 
vertex.recoveredSourceEdgeProperties,
-              vertex.recoveredRootInputSpecUpdates, true, false);
-            successSetParallelism = true;
-          } catch (Exception e) {
-            successSetParallelism = false;
-          }
-          if (!successSetParallelism) {
-            String msg = "Failed to recover edge managers for vertex:" + 
vertex.logIdentifier;
-            LOG.error(msg);
-            vertex.finished(VertexState.FAILED,
-                VertexTerminationCause.INIT_FAILURE, msg);
-            endState = VertexState.FAILED;
-            break;
-          }
-          assert vertex.tasks.size() == vertex.numTasks;
-          if (vertex.tasks != null && vertex.numTasks != 0) {
-            for (Task task : vertex.tasks.values()) {
-              vertex.eventHandler.handle(
-                  new TaskEventRecoverTask(task.getTaskId()));
-            }
-            try {
-              vertex.recoveryCodeSimulatingStart();
-              vertex.unsetTasksNotYetScheduled();
-              endState = VertexState.RUNNING;
-            } catch (AMUserCodeException e) {
-              String msg = "Exception in " + e.getSource() + ", vertex=" + 
vertex.getLogIdentifier();
-              LOG.error(msg, e);
-              vertex.finished(VertexState.FAILED, 
VertexTerminationCause.AM_USERCODE_FAILURE,
-                  msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
-              endState = VertexState.FAILED;
-            }
-          } else {
-            endState = VertexState.SUCCEEDED;
-            vertex.finished(endState);
-          }
-          break;
-        case SUCCEEDED:
-          // recover tasks
-          assert vertex.tasks.size() == vertex.numTasks;
-          if (vertex.tasks != null  && vertex.numTasks != 0) {
-            TaskState taskState = TaskState.SUCCEEDED;
-            for (Task task : vertex.tasks.values()) {
-              vertex.eventHandler.handle(
-                  new TaskEventRecoverTask(task.getTaskId(),
-                      taskState));
-            }
-            // Wait for all tasks to recover and report back
-            try {
-              vertex.recoveryCodeSimulatingStart();
-              vertex.unsetTasksNotYetScheduled();
-              endState = VertexState.RUNNING;
-            } catch (AMUserCodeException e) {
-              String msg = "Exception in " + e.getSource() + ", vertex:" + 
vertex.getLogIdentifier();
-              LOG.error(msg, e);
-              vertex.finished(VertexState.FAILED, 
VertexTerminationCause.AM_USERCODE_FAILURE,
-                  msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
-              endState = VertexState.FAILED;
-            }
-          } else {
-            endState = vertex.recoveredState;
-            vertex.finished(endState);
-          }
-          break;
-        case FAILED:
-        case KILLED:
-          // vertex may be killed/failed before its tasks are scheduled. so 
here just recover vertex
-          // to the recovered state without waiting for its tasks' feedback 
and recover tasks to
-          // the corresponding state without recover its data.
-          if (vertex.tasks != null && vertex.numTasks != 0) {
-            TaskState taskState = TaskState.FAILED;
-            if (vertex.recoveredState == VertexState.KILLED) {
-              taskState = TaskState.KILLED;
-            }
-            for (Task task : vertex.tasks.values()) {
-              vertex.eventHandler.handle(
-                  new TaskEventRecoverTask(task.getTaskId(),
-                      taskState, false));
-            }
-          }
-          endState = vertex.recoveredState;
-          vertex.finished(endState);
-          break;
-        default:
-          LOG.warn("Invalid recoveredState found when trying to recover"
-              + " vertex, recoveredState=" + vertex.recoveredState);
-          vertex.finished(VertexState.ERROR);
-          endState = VertexState.ERROR;
-          break;
-      }
-
-      LOG.info("Recovered Vertex State"
-          + ", vertexId=" + vertex.logIdentifier
-          + ", state=" + endState
-          + ", numInitedSourceVertices" + vertex.numInitedSourceVertices
-          + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices
-          + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
-          + ", tasksIsNull=" + (vertex.tasks == null)
-          + ", numTasks=" + ( vertex.tasks == null ? 0 : vertex.tasks.size()));
-
-      for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
-        vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
-            entry.getKey().getVertexId(),
-            vertex.vertexId, endState, completedTaskAttempts,
-            vertex.getDistanceFromRoot()));
-      }
-      if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, 
VertexState.INITED)
-          .contains(endState)) {
-        // Send events downstream
-        vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
-        vertex.recoveredEvents.clear();
-        if (!vertex.pendingRouteEvents.isEmpty()) {
-          try {
-            vertex.handleRoutedTezEvents(vertex.pendingRouteEvents, false, 
true);
-            vertex.pendingRouteEvents.clear();
-          } catch (AMUserCodeException e) {
-            String msg = "Exception in " + e.getSource() + ", vertex=" + 
vertex.getLogIdentifier();
-            LOG.error(msg, e);
-            vertex.finished(VertexState.FAILED, 
VertexTerminationCause.AM_USERCODE_FAILURE,
-                msg + ", " + e.getMessage() + ", " + 
ExceptionUtils.getStackTrace(e.getCause()));
-            endState = VertexState.FAILED;
-          }
-        }
-      } else {
-        // Ensure no recovered events
-        if (!vertex.recoveredEvents.isEmpty()) {
-          throw new RuntimeException("Invalid Vertex state"
-              + ", found non-zero recovered events in invalid state"
-              + ", recoveredState=" + endState
-              + ", recoveredEvents=" + vertex.recoveredEvents.size());
-        }
+      VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) 
vertexEvent;
+      // with desired state, for the cases that DAG is completed
+      VertexState desiredState = recoverEvent.getDesiredState();
+      switch (desiredState) {
+      case SUCCEEDED:
+        vertex.succeededTaskCount = vertex.numTasks;
+        vertex.completedTaskCount = vertex.numTasks;
+        break;
+      case KILLED:
+        vertex.killedTaskCount = vertex.numTasks;
+        break;
+      case FAILED:
+      case ERROR:
+        vertex.failedTaskCount = vertex.numTasks;
+        break;
+      default:
+        LOG.info("Unhandled desired state provided by DAG"
+            + ", vertex=" + vertex.logIdentifier
+            + ", state=" + desiredState);
+        return vertex.finished(VertexState.ERROR);
       }
-      return endState;
-    }
-
-  }
 
-  public static class IgnoreInitInInitedTransition implements
-      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
-
-    @Override
-    public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      LOG.info("Received event during INITED state"
+      LOG.info("DAG informed vertices of its final completed state"
           + ", vertex=" + vertex.logIdentifier
-          + ", eventType=" + event.getType());
-      if (!vertex.vertexAlreadyInitialized) {
-        LOG.error("Vertex not initialized but in INITED state"
-            + ", vertexId=" + vertex.logIdentifier);
-        return vertex.finished(VertexState.ERROR);
-      } else {
-        return VertexState.INITED;
-      }
+          + ", desiredState=" + desiredState);
+      return vertex.finished(recoverEvent.getDesiredState());
     }
   }
-
-
-
+  
   public static class InitTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      // recover from recovery data (NEW->FAILED/KILLED)
+      if (vertex.recoveryData != null
+          && !vertex.recoveryData.isVertexInited()
+          && vertex.recoveryData.isVertexFinished()) {
+        VertexFinishedEvent finishedEvent = 
vertex.recoveryData.getVertexFinishedEvent();
+        vertex.diagnostics.add(finishedEvent.getDiagnostics());
+        return vertex.finished(finishedEvent.getState());
+      }
+
       VertexState vertexState = VertexState.NEW;
       vertex.numInitedSourceVertices++;
-      // TODO fix this as part of TEZ-1008
-      // Should have a different way to infer source vertices INITED
-      // as compared to a recovery triggered INIT
-      // In normal flow, upstream vertices send a V_INIT downstream to
-      // trigger an init of the downstream vertex. In case of recovery,
-      // upstream vertices may not send this event if they are already in a
-      // RUNNING or completed state. Hence, recovering vertices may send
-      // themselves a V_INIT to trigger a transition. Hence, the count may
-      // go one over.
       if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
-          (vertex.numInitedSourceVertices == vertex.sourceVertices.size()
-            || vertex.numInitedSourceVertices == 
(vertex.sourceVertices.size()+1))) {
-        vertexState = handleInitEvent(vertex, event);
+          (vertex.numInitedSourceVertices == vertex.sourceVertices.size())) {
+        vertexState = handleInitEvent(vertex);
         if (vertexState != VertexState.FAILED) {
           if (vertex.targetVertices != null && 
!vertex.targetVertices.isEmpty()) {
             for (Vertex target : vertex.targetVertices.keySet()) {
               vertex.getEventHandler().handle(new 
VertexEvent(target.getVertexId(),
-                VertexEventType.V_INIT));
+                  VertexEventType.V_INIT));
             }
           }
         }
@@ -3412,11 +2630,12 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
       return vertexState;
     }
 
-    private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
+    private VertexState handleInitEvent(VertexImpl vertex) {
       VertexState state = vertex.setupVertex();
       if (state.equals(VertexState.FAILED)) {
         return state;
       }
+
       // TODO move before to handle NEW state
       if (vertex.targetVertices != null) {
         for (Edge e : vertex.targetVertices.values()) {
@@ -3448,13 +2667,15 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
             + " to set #tasks for the vertex " + vertex.getLogIdentifier());
 
         if (vertex.inputsWithInitializers != null) {
-          LOG.info("Vertex will initialize from input initializer. " + 
vertex.logIdentifier);
-          try {
-            vertex.setupInputInitializerManager();
-          } catch (TezException e) {
-            String msg = "Fail to create InputInitializerManager, " + 
ExceptionUtils.getStackTrace(e);
-            LOG.info(msg);
-            return vertex.finished(VertexState.FAILED, 
VertexTerminationCause.INIT_FAILURE, msg);
+          if (vertex.recoveryData == null || 
!vertex.recoveryData.shouldSkipInit()) {
+            LOG.info("Vertex will initialize from input initializer. " + 
vertex.logIdentifier);
+            try {
+              vertex.setupInputInitializerManager();
+            } catch (TezException e) {
+              String msg = "Fail to create InputInitializerManager, " + 
ExceptionUtils.getStackTrace(e);
+              LOG.info(msg);
+              return vertex.finished(VertexState.FAILED, 
VertexTerminationCause.INIT_FAILURE, msg);
+            }
           }
           return VertexState.INITIALIZING;
         } else {
@@ -3484,7 +2705,8 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         LOG.info("Creating " + vertex.numTasks + " tasks for vertex: " + 
vertex.logIdentifier);
         vertex.createTasks();
         // this block may return VertexState.INITIALIZING
-        if (vertex.inputsWithInitializers != null) {
+        if (vertex.inputsWithInitializers != null &&
+            (vertex.recoveryData == null || 
!vertex.recoveryData.shouldSkipInit())) {
           LOG.info("Vertex will initialize from input initializer. " + 
vertex.logIdentifier);
           try {
             vertex.setupInputInitializerManager();
@@ -3608,7 +2830,8 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
       List<TezEvent> inputInfoEvents = iEvent.getEvents();
       try {
         if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
-          vertex.handleRoutedTezEvents(inputInfoEvents, false, false);
+          vertex.initGeneratedEvents.addAll(inputInfoEvents);
+          vertex.handleRoutedTezEvents(inputInfoEvents, false);
         }
       } catch (AMUserCodeException e) {
         String msg = "Exception in " + e.getSource() + ", vertex:" + 
vertex.getLogIdentifier();
@@ -3718,8 +2941,8 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
   public static class StartTransition implements
     MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
-  @Override
-  public VertexState transition(VertexImpl vertex, VertexEvent event) {
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
       Preconditions.checkState(vertex.getState() == VertexState.INITED,
           "Unexpected state " + vertex.getState() + " for " + 
vertex.logIdentifier);
       // if the start signal is pending this event is a fake start event to 
trigger this transition
@@ -3739,19 +2962,22 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
       if (completelyConfiguredSent.compareAndSet(false, true)) {
         stateChangeNotifier.stateChanged(vertexId, new 
VertexStateUpdate(vertexName,
             org.apache.tez.dag.api.event.VertexState.CONFIGURED));
+        logVertexConfigurationDoneEvent();
       }
-    }    
+    }
   }
 
   private VertexState startVertex() {
-    // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-    // IMPORTANT - Until Recovery is fixed to use normal state transitions, if 
any code is added 
-    // here then please check if it needs to be duplicated in 
recoveryCodeSimulatingStart().
-    // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     Preconditions.checkState(getState() == VertexState.INITED,
         "Vertex must be inited " + logIdentifier);
 
-    startedTime = clock.getTime();
+    if (recoveryData != null && recoveryData.isVertexStarted()) {
+      VertexStartedEvent vertexStartedEvent = 
recoveryData.getVertexStartedEvent();
+      this.startedTime = vertexStartedEvent.getStartTime();
+    } else {
+      this.startedTime = clock.getTime();
+    }
+
     try {
       vertexManager.onVertexStarted(getTaskAttemptIdentifiers(dag, 
pendingReportedSrcCompletions));
     } catch (AMUserCodeException e) {
@@ -3967,14 +3193,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
       String msg = "Exception in " + e.getSource() + ", vertex:" + 
vertex.getLogIdentifier();
       LOG.error(msg, e);
       
-      if (vertex.getState() == VertexState.RECOVERING) {
-        LOG.info("Received a user code error during recovering, setting 
recovered"
-            + " state to FAILED");
-        vertex.addDiagnostic(msg + "," + 
ExceptionUtils.getStackTrace(e.getCause()));
-        
vertex.trySetTerminationCause(VertexTerminationCause.AM_USERCODE_FAILURE);
-        vertex.recoveredState = VertexState.FAILED;
-        return VertexState.RECOVERING;
-      } else if (vertex.getState() == VertexState.RUNNING || vertex.getState() 
== VertexState.COMMITTING) {
+      if (vertex.getState() == VertexState.RUNNING || vertex.getState() == 
VertexState.COMMITTING) {
         vertex.addDiagnostic(msg + "," + 
ExceptionUtils.getStackTrace(e.getCause()));
         vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE,
             TaskTerminationCause.AM_USERCODE_FAILURE);
@@ -4269,10 +3488,9 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
       VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
-      boolean recovered = rEvent.isRecovered();
       List<TezEvent> tezEvents = rEvent.getEvents();
       try {
-        vertex.handleRoutedTezEvents(tezEvents, recovered, false);
+        vertex.handleRoutedTezEvents(tezEvents, false);
       } catch (AMUserCodeException e) {
         String msg = "Exception in " + e.getSource() + ", vertex=" + 
vertex.getLogIdentifier();
         LOG.error(msg, e);
@@ -4413,37 +3631,11 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     return new TaskAttemptEventInfo(nextFromEventId, events, 
nextPreRoutedFromEventId);
   }
 
-  private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean 
recovered, boolean isPendingEvents) throws AMUserCodeException {
-    if (getAppContext().isRecoveryEnabled()
-        && !recovered
-        && !isPendingEvents
-        && !tezEvents.isEmpty()) {
-      List<TezEvent> recoveryEvents =
-          Lists.newArrayList();
-      for (TezEvent tezEvent : tezEvents) {
-        if (!isEventFromVertex(this, tezEvent.getSourceInfo())) {
-          continue;
-        }
-        if  
(tezEvent.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)
-          || tezEvent.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)
-          || 
tezEvent.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)
-          || 
tezEvent.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) {
-          recoveryEvents.add(tezEvent);
-        }
-      }
-      if (!recoveryEvents.isEmpty()) {
-        VertexRecoverableEventsGeneratedEvent historyEvent =
-            new VertexRecoverableEventsGeneratedEvent(vertexId,
-                recoveryEvents);
-        appContext.getHistoryHandler().handle(
-            new DAGHistoryEvent(getDAGId(), historyEvent));
-      }
-    }
+  private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean 
isPendingEvents) throws AMUserCodeException {
     for(TezEvent tezEvent : tezEvents) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Vertex: " + getLogIdentifier() + " routing event: "
-            + tezEvent.getEventType()
-            + " Recovered:" + recovered);
+            + tezEvent.getEventType());
       }
       EventMetaData sourceMeta = tezEvent.getSourceInfo();
       switch(tezEvent.getEventType()) {
@@ -4580,44 +3772,6 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
           srcEdge.sendTezEventToSourceTasks(tezEvent);
         }
         break;
-      case TASK_ATTEMPT_FAILED_EVENT:
-        {
-          checkEventSourceMetadata(this, sourceMeta);
-          TaskAttemptTerminationCause errCause = null;
-          switch (sourceMeta.getEventGenerator()) {
-          case INPUT:
-            errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
-            break;
-          case PROCESSOR:
-            errCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
-            break;
-          case OUTPUT:
-            errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
-            break;
-          case SYSTEM:
-            errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
-            break;
-          default:
-            throw new TezUncheckedException("Unknown 
EventProducerConsumerType: " +
-                sourceMeta.getEventGenerator());
-          }
-          TaskAttemptFailedEvent taskFailedEvent =
-              (TaskAttemptFailedEvent) tezEvent.getEvent();
-          getEventHandler().handle(
-              new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
-                  TaskAttemptEventType.TA_FAILED,
-                  "Error: " + taskFailedEvent.getDiagnostics(), 
-                  errCause)
-              );
-        }
-        break;
-      case TASK_ATTEMPT_COMPLETED_EVENT:
-        {
-          checkEventSourceMetadata(this, sourceMeta);
-          getEventHandler().handle(
-              new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), 
TaskAttemptEventType.TA_DONE));
-        }
-        break;
       default:
         throw new TezUncheckedException("Unhandled tez event type: "
             + tezEvent.getEventType());
@@ -4708,12 +3862,12 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
           return org.apache.tez.dag.api.event.VertexState.FAILED;
         case KILLED:
           return org.apache.tez.dag.api.event.VertexState.KILLED;
-        case NEW:
         case INITIALIZING:
+          return org.apache.tez.dag.api.event.VertexState.INITIALIZING;
+        case NEW:
         case INITED:
         case ERROR:
         case TERMINATING:
-        case RECOVERING:
         default:
           throw new TezUncheckedException(
               "Not expecting state updates for state: " + vertexState + ", 
VertexID: " + vertexId);
@@ -5104,4 +4258,100 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
       LOG.debug("Vertex: " + vertexName + ", rack: " + rack.toString());
     }
   }
+
+  /**
+   * This is for recovery when VertexReconfigureDoneEvent is seen. 
+   */
+  public static class NoOpVertexManager extends VertexManagerPlugin {
+
+    private VertexConfigurationDoneEvent configurationDoneEvent;
+    private boolean setParallelismInInitializing = false;
+
+    public NoOpVertexManager(VertexManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("initialize NoOpVertexManager");
+      }
+      configurationDoneEvent = new VertexConfigurationDoneEvent();
+      configurationDoneEvent.fromProtoStream(new 
ByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray()));
+      String vertexName = getContext().getVertexName();
+      if (getContext().getVertexNumTasks(vertexName) == -1) {
+        
Preconditions.checkArgument(configurationDoneEvent.isSetParallelismCalled(), 
"SetParallelism must be called "
+            + "when numTasks is -1");
+        setParallelismInInitializing = true;
+        getContext().registerForVertexStateUpdates(vertexName,
+            
Sets.newHashSet(org.apache.tez.dag.api.event.VertexState.INITIALIZING));
+      }
+      getContext().vertexReconfigurationPlanned();
+    }
+
+    @Override
+    public void onVertexStarted(List<TaskAttemptIdentifier> completions)
+        throws Exception {
+      // apply the ReconfigureDoneEvent and then schedule all the tasks.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("onVertexStarted is invoked in NoOpVertexManager, vertex=" + 
getContext().getVertexName());
+      }
+      if (!setParallelismInInitializing && 
configurationDoneEvent.isSetParallelismCalled()) {
+        reconfigureVertex();
+      }
+      getContext().doneReconfiguringVertex();
+      int numTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Schedule all the tasks, numTask=" + numTasks);
+      }
+      List<ScheduleTaskRequest> tasks = new ArrayList<ScheduleTaskRequest>();
+      for (int i=0;i<numTasks;++i) {
+        tasks.add(ScheduleTaskRequest.create(i, null));
+      }
+      getContext().scheduleTasks(tasks);
+    }
+
+    @Override
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt)
+        throws Exception {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("onSourceTaskCompleted is invoked in NoOpVertexManager, 
vertex=" + getContext().getVertexName());
+      }
+    }
+
+    @Override
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent)
+        throws Exception {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("onVertexManagerEventReceived is invoked in 
NoOpVertexManager, vertex=" + getContext().getVertexName());
+      }
+    }
+
+    @Override
+    public

<TRUNCATED>

Reply via email to