http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
deleted file mode 100644
index e389d64..0000000
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ /dev/null
@@ -1,1340 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.dag.impl;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.api.records.DAGProtos;
-import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
-import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
-import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
-import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
-import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
-import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
-import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
-import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
-import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.ClusterInfo;
-import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
-import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.DAGState;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.VertexState;
-import org.apache.tez.dag.app.dag.VertexTerminationCause;
-import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
-import org.apache.tez.dag.app.dag.event.DAGEvent;
-import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
-import org.apache.tez.dag.app.dag.event.DAGEventType;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
-import org.apache.tez.dag.app.dag.event.TaskEvent;
-import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
-import org.apache.tez.dag.app.dag.event.TaskEventType;
-import org.apache.tez.dag.app.dag.event.VertexEvent;
-import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
-import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
-import org.apache.tez.dag.app.dag.event.VertexEventType;
-import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
-import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
-import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
-import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
-import org.apache.tez.dag.history.events.VertexFinishedEvent;
-import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexStartedEvent;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
-import 
org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishStateProto;
-import org.apache.tez.runtime.api.OutputCommitter;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class TestVertexRecovery {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestVertexRecovery.class);
-
-  private DrainDispatcher dispatcher;
-
-  private AppContext mockAppContext;
-  private ApplicationId appId = ApplicationId.newInstance(
-      System.currentTimeMillis(), 1);
-  private DAGImpl dag;
-  private TezDAGID dagId = TezDAGID.getInstance(appId, 1);
-  private String user = "user";
-
-  private long initRequestedTime = 100L;
-  private long initedTime = initRequestedTime + 100L;
-
-  /*
-   * v1 v2 \ / v3
-   */
-  private DAGPlan createDAGPlan() {
-    DAGPlan dag =
-        DAGPlan
-            .newBuilder()
-            .setName("testverteximpl")
-            .addVertex(
-                VertexPlan
-                    .newBuilder()
-                    .setName("vertex1")
-                    .setType(PlanVertexType.NORMAL)
-                    .addTaskLocationHint(
-                        PlanTaskLocationHint.newBuilder().addHost("host1")
-                            .addRack("rack1").build())
-                    .setTaskConfig(
-                        PlanTaskConfiguration.newBuilder().setNumTasks(1)
-                            .setVirtualCores(4).setMemoryMb(1024)
-                            .setJavaOpts("").setTaskModule("x1.y1").build())
-                    .addOutEdgeId("e1")
-                    .addOutputs(
-                        DAGProtos.RootInputLeafOutputProto
-                            .newBuilder()
-                            .setIODescriptor(
-                                TezEntityDescriptorProto.newBuilder()
-                                    .setClassName("output").build())
-                            .setName("outputx")
-                            .setControllerDescriptor(
-                                TezEntityDescriptorProto
-                                    .newBuilder()
-                                    .setClassName(
-                                        
CountingOutputCommitter.class.getName())))
-                    .build())
-            .addVertex(
-                VertexPlan
-                    .newBuilder()
-                    .setName("vertex2")
-                    .setType(PlanVertexType.NORMAL)
-                    .addTaskLocationHint(
-                        PlanTaskLocationHint.newBuilder().addHost("host2")
-                            .addRack("rack2").build())
-                    .setTaskConfig(
-                        PlanTaskConfiguration.newBuilder().setNumTasks(2)
-                            .setVirtualCores(4).setMemoryMb(1024)
-                            .setJavaOpts("").setTaskModule("x2.y2").build())
-                    .addOutEdgeId("e2").build())
-            .addVertex(
-                VertexPlan
-                    .newBuilder()
-                    .setName("vertex3")
-                    .setType(PlanVertexType.NORMAL)
-                    .setProcessorDescriptor(
-                        TezEntityDescriptorProto.newBuilder().setClassName(
-                            "x3.y3"))
-                    .addTaskLocationHint(
-                        PlanTaskLocationHint.newBuilder().addHost("host3")
-                            .addRack("rack3").build())
-                    .setTaskConfig(
-                        PlanTaskConfiguration.newBuilder().setNumTasks(2)
-                            .setVirtualCores(4).setMemoryMb(1024)
-                            .setJavaOpts("foo").setTaskModule("x3.y3").build())
-                    .addInEdgeId("e1")
-                    .addInEdgeId("e2")
-                    .addOutputs(
-                        DAGProtos.RootInputLeafOutputProto
-                            .newBuilder()
-                            .setIODescriptor(
-                                TezEntityDescriptorProto.newBuilder()
-                                    .setClassName("output").build())
-                            .setName("outputx")
-                            .setControllerDescriptor(
-                                TezEntityDescriptorProto
-                                    .newBuilder()
-                                    .setClassName(
-                                        
CountingOutputCommitter.class.getName())))
-                    .build()
-
-            )
-
-            .addEdge(
-                EdgePlan
-                    .newBuilder()
-                    .setEdgeDestination(
-                        TezEntityDescriptorProto.newBuilder().setClassName(
-                            "i3_v1"))
-                    .setInputVertexName("vertex1")
-                    .setEdgeSource(
-                        TezEntityDescriptorProto.newBuilder()
-                            .setClassName("o1"))
-                    .setOutputVertexName("vertex3")
-                    .setDataMovementType(
-                        PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1")
-                    .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-                    .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-                    .build())
-            .addEdge(
-                EdgePlan
-                    .newBuilder()
-                    .setEdgeDestination(
-                        TezEntityDescriptorProto.newBuilder().setClassName(
-                            "i3_v2"))
-                    .setInputVertexName("vertex2")
-                    .setEdgeSource(
-                        TezEntityDescriptorProto.newBuilder()
-                            .setClassName("o2"))
-                    .setOutputVertexName("vertex3")
-                    .setDataMovementType(
-                        PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2")
-                    .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-                    .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-                    .build()).build();
-
-    return dag;
-  }
-
-  private DAGPlan createDAGPlanSingleVertex() {
-    DAGPlan dag =
-        DAGPlan
-            .newBuilder()
-            .setName("testverteximpl")
-            .addVertex(
-                VertexPlan
-                    .newBuilder()
-                    .setName("vertex1")
-                    .setType(PlanVertexType.NORMAL)
-                    .addTaskLocationHint(
-                        PlanTaskLocationHint.newBuilder().addHost("host1")
-                            .addRack("rack1").build())
-                    .setTaskConfig(
-                        PlanTaskConfiguration.newBuilder().setNumTasks(-1)
-                            .setVirtualCores(4).setMemoryMb(1024)
-                            .setJavaOpts("").setTaskModule("x1.y1").build())
-                    .addInputs(RootInputLeafOutputProto.newBuilder()
-                            .setIODescriptor(
-                                TezEntityDescriptorProto.newBuilder()
-                                    .setClassName("input").build())
-                            .setName("inputx")
-                            .setControllerDescriptor(
-                                TezEntityDescriptorProto
-                                    .newBuilder()
-                                    .setClassName("inputinitlizer"))
-                            .build())
-                    .addOutputs(
-                        DAGProtos.RootInputLeafOutputProto
-                            .newBuilder()
-                            .setIODescriptor(
-                                TezEntityDescriptorProto.newBuilder()
-                                    .setClassName("output").build())
-                            .setName("outputx")
-                            .setControllerDescriptor(
-                                TezEntityDescriptorProto
-                                    .newBuilder()
-                                    .setClassName(
-                                        
CountingOutputCommitter.class.getName())))
-                    .build()).build();
-    return dag;
-  }
-
-  /*
-   * v1
-   *  |
-   * v2
-   */
-  private DAGPlan createDAGPlanMR() {
-    DAGPlan dag =
-        DAGPlan
-            .newBuilder()
-            .setName("testverteximpl")
-            .addVertex(
-                VertexPlan
-                    .newBuilder()
-                    .setName("vertex1")
-                    .setType(PlanVertexType.NORMAL)
-                    .addTaskLocationHint(
-                        PlanTaskLocationHint.newBuilder().addHost("host1")
-                            .addRack("rack1").build())
-                    .setTaskConfig(
-                        PlanTaskConfiguration.newBuilder().setNumTasks(1)
-                            .setVirtualCores(4).setMemoryMb(1024)
-                            .setJavaOpts("").setTaskModule("x1.y1").build())
-                    .addOutEdgeId("e1")
-                    .addOutputs(
-                        DAGProtos.RootInputLeafOutputProto
-                            .newBuilder()
-                            .setIODescriptor(
-                                TezEntityDescriptorProto.newBuilder()
-                                    .setClassName("output").build())
-                            .setName("outputx")
-                            .setControllerDescriptor(
-                                TezEntityDescriptorProto
-                                    .newBuilder()
-                                    .setClassName(
-                                        
CountingOutputCommitter.class.getName())))
-                    .build())
-            .addVertex(
-                VertexPlan
-                    .newBuilder()
-                    .setName("vertex2")
-                    .setType(PlanVertexType.NORMAL)
-                    .setProcessorDescriptor(
-                        TezEntityDescriptorProto.newBuilder().setClassName(
-                            "x2.y2"))
-                    .addTaskLocationHint(
-                        PlanTaskLocationHint.newBuilder().addHost("host2")
-                            .addRack("rack2").build())
-                    .setTaskConfig(
-                        PlanTaskConfiguration.newBuilder().setNumTasks(2)
-                            .setVirtualCores(4).setMemoryMb(1024)
-                            .setJavaOpts("foo").setTaskModule("x2.y2").build())
-                    .addInEdgeId("e1")
-                    .addOutputs(
-                        DAGProtos.RootInputLeafOutputProto
-                            .newBuilder()
-                            .setIODescriptor(
-                                TezEntityDescriptorProto.newBuilder()
-                                    .setClassName("output").build())
-                            .setName("outputx")
-                            .setControllerDescriptor(
-                                TezEntityDescriptorProto
-                                    .newBuilder()
-                                    .setClassName(
-                                        
CountingOutputCommitter.class.getName())))
-                    .build()
-
-            )
-            .addEdge(
-                EdgePlan
-                    .newBuilder()
-                    .setEdgeDestination(
-                        TezEntityDescriptorProto.newBuilder().setClassName(
-                            "i2_v1"))
-                    .setInputVertexName("vertex1")
-                    .setEdgeSource(
-                        TezEntityDescriptorProto.newBuilder()
-                            .setClassName("o1"))
-                    .setOutputVertexName("vertex2")
-                    .setDataMovementType(
-                        PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1")
-                    .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-                    .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-                    .build())
-            .build();
-
-    return dag;
-  }
-
-  class DAGEventHandler implements EventHandler<DAGEvent> {
-    @Override
-    public void handle(DAGEvent event) {
-      dag.handle(event);
-    }
-  }
-
-  class VertexEventHanlder implements EventHandler<VertexEvent> {
-
-    private List<VertexEvent> events = new ArrayList<VertexEvent>();
-
-    @Override
-    public void handle(VertexEvent event) {
-      events.add(event);
-      ((VertexImpl) dag.getVertex(event.getVertexId())).handle(event);
-    }
-
-    public List<VertexEvent> getEvents() {
-      return this.events;
-    }
-  }
-
-  class TaskEventHandler implements EventHandler<TaskEvent> {
-
-    private List<TaskEvent> events = new ArrayList<TaskEvent>();
-
-    @Override
-    public void handle(TaskEvent event) {
-      events.add(event);
-      ((TaskImpl) dag.getVertex(event.getTaskID().getVertexID()).getTask(
-          event.getTaskID())).handle(event);
-    }
-
-    public List<TaskEvent> getEvents() {
-      return events;
-    }
-  }
-
-  class TaskAttemptEventHandler implements EventHandler<TaskAttemptEvent> {
-
-    @Override
-    public void handle(TaskAttemptEvent event) {
-      // TezTaskID taskId = event.getTaskAttemptID().getTaskID();
-      // ((TaskAttemptImpl) vertex1.getTask(taskId).getAttempt(
-      // event.getTaskAttemptID())).handle(event);
-    }
-  }
-
-  private DAGEventHandler dagEventHandler;
-  private VertexEventHanlder vertexEventHandler;
-  private TaskEventHandler taskEventHandler;
-
-  @Before
-  public void setUp() throws IOException {
-
-    dispatcher = new DrainDispatcher();
-    dispatcher.register(DAGAppMasterEventType.class, mock(EventHandler.class));
-    dagEventHandler = new DAGEventHandler();
-    dispatcher.register(DAGEventType.class, dagEventHandler);
-    vertexEventHandler = new VertexEventHanlder();
-    dispatcher.register(VertexEventType.class, vertexEventHandler);
-    taskEventHandler = new TaskEventHandler();
-    dispatcher.register(TaskEventType.class, taskEventHandler);
-    dispatcher.register(TaskAttemptEventType.class,
-        new TaskAttemptEventHandler());
-    dispatcher.init(new Configuration());
-    dispatcher.start();
-
-    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
-
-    DAGPlan dagPlan = createDAGPlan();
-    dag =
-        new DAGImpl(dagId, new Configuration(), dagPlan,
-            dispatcher.getEventHandler(), 
mock(TaskCommunicatorManagerInterface.class),
-            new Credentials(), new SystemClock(), user,
-            mock(TaskHeartbeatHandler.class), mockAppContext);
-    when(mockAppContext.getCurrentDAG()).thenReturn(dag);
-    ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
-    doReturn(clusterInfo).when(mockAppContext).getClusterInfo();
-
-    dag.restoreFromEvent(new DAGInitializedEvent(dag.getID(), 0L, "user", 
"dagName", null));
-    dag.restoreFromEvent(new DAGStartedEvent(dag.getID(), 0L, "user", 
"dagName"));
-    LOG.info("finish setUp");
-  }
-
-  /**
-   * vertex1(New) -> StartRecoveryTransition(SUCCEEDED)
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_Desired_SUCCEEDED() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    VertexState recoveredState = vertex1.restoreFromEvent(new 
VertexInitializedEvent(vertex1.getVertexId(),
-        "vertex1", initRequestedTime, initedTime, vertex1.getTotalTasks(), "", 
null));
-    assertEquals(VertexState.INITED, recoveredState);
-    
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.SUCCEEDED));
-    dispatcher.await();
-    assertEquals(VertexState.SUCCEEDED, vertex1.getState());
-    assertEquals(vertex1.numTasks, vertex1.succeededTaskCount);
-    assertEquals(vertex1.numTasks, vertex1.completedTaskCount);
-    // recover its task
-    assertTaskRecoveredEventSent(vertex1);
-
-    // vertex3 is still in NEW, when the desiredState is
-    // Completed State, each vertex recovery by itself, not depend on its 
parent
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    assertEquals(VertexState.NEW, vertex3.getState());
-    // no VertexEvent pass to downstream vertex
-    assertEquals(0, vertexEventHandler.getEvents().size());
-
-  }
-
-  @Test(timeout = 5000)
-  public void testRecovery_SetParallelism() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    int oldNumTasks = 10;
-    VertexState recoveredState = vertex1
-        .restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(), 
"vertex1",
-            initRequestedTime, initedTime, oldNumTasks, "", null));
-    assertEquals(VertexState.INITED, recoveredState);
-    recoveredState = vertex1.restoreFromEvent(new 
VertexParallelismUpdatedEvent(vertex1
-        .getVertexId(), 5, null, null, null, oldNumTasks));
-    assertEquals(5, vertex1.getTotalTasks());
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.SUCCEEDED));
-    dispatcher.await();
-    assertEquals(VertexState.SUCCEEDED, vertex1.getState());
-    assertEquals(vertex1.numTasks, vertex1.succeededTaskCount);
-    assertEquals(vertex1.numTasks, vertex1.completedTaskCount);
-    // recover its task
-    assertTaskRecoveredEventSent(vertex1);
-
-    // vertex3 is still in NEW, when the desiredState is
-    // Completed State, each vertex recovery by itself, not depend on its 
parent
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    assertEquals(VertexState.NEW, vertex3.getState());
-    // no VertexEvent pass to downstream vertex
-    assertEquals(0, vertexEventHandler.getEvents().size());
-  }
-  
-  @Test(timeout = 5000)
-  public void testRecovery_SetParallelismMultiple() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    int oldNumTasks = 10;
-    VertexState recoveredState = vertex1
-        .restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(), 
"vertex1",
-            initRequestedTime, initedTime, oldNumTasks, "", null));
-    assertEquals(VertexState.INITED, recoveredState);
-    recoveredState = vertex1.restoreFromEvent(new 
VertexParallelismUpdatedEvent(vertex1
-        .getVertexId(), 5, null, null, null, oldNumTasks));
-    assertEquals(5, vertex1.getTotalTasks());
-    recoveredState = vertex1.restoreFromEvent(new 
VertexParallelismUpdatedEvent(vertex1
-        .getVertexId(), 7, null, null, null, 5));
-    assertEquals(7, vertex1.getTotalTasks());
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.SUCCEEDED));
-    dispatcher.await();
-    assertEquals(VertexState.SUCCEEDED, vertex1.getState());
-    assertEquals(vertex1.numTasks, vertex1.succeededTaskCount);
-    assertEquals(vertex1.numTasks, vertex1.completedTaskCount);
-    // recover its task
-    assertTaskRecoveredEventSent(vertex1);
-
-    // vertex3 is still in NEW, when the desiredState is
-    // Completed State, each vertex recovery by itself, not depend on its 
parent
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    assertEquals(VertexState.NEW, vertex3.getState());
-    // no VertexEvent pass to downstream vertex
-    assertEquals(0, vertexEventHandler.getEvents().size());
-
-  }
-
-
-  /**
-   * vertex1(New) -> StartRecoveryTransition(SUCCEEDED)
-   * @throws IOException 
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_Desired_SUCCEEDED_OnlySummaryLog() throws 
IOException {
-    DAGPlan dagPlan = createDAGPlanSingleVertex();
-    dag =
-        new DAGImpl(dagId, new Configuration(), dagPlan,
-            dispatcher.getEventHandler(), 
mock(TaskCommunicatorManagerInterface.class),
-            new Credentials(), new SystemClock(), user,
-            mock(TaskHeartbeatHandler.class), mockAppContext);
-    when(mockAppContext.getCurrentDAG()).thenReturn(dag);
-    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
-
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    VertexFinishedEvent vertexFinishEvent = new VertexFinishedEvent();
-    vertexFinishEvent.fromSummaryProtoStream(SummaryEventProto.newBuilder()
-        .setDagId(dag.getID().toString())
-        .setEventType(HistoryEventType.VERTEX_FINISHED.ordinal())
-        .setTimestamp(100L)
-        .setEventPayload(VertexFinishStateProto.newBuilder()
-            .setNumTasks(2)
-            .setState(VertexState.SUCCEEDED.ordinal())
-            
.setVertexId(vertex1.getVertexId().toString()).build().toByteString())
-        .build());
-    VertexState recoveredState = vertex1.restoreFromEvent(vertexFinishEvent);
-    // numTasks is recovered from summary log
-    assertEquals(2, vertex1.numTasks);
-    assertEquals(VertexState.SUCCEEDED, recoveredState);
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.SUCCEEDED));
-    dispatcher.await();
-    assertEquals(VertexState.SUCCEEDED, vertex1.getState());
-    assertEquals(vertex1.numTasks, vertex1.succeededTaskCount);
-    assertEquals(vertex1.numTasks, vertex1.completedTaskCount);
-  }
-
-  /**
-   * vertex1(New) -> StartRecoveryTransition(FAILED)
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_Desired_FAILED() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    VertexState recoveredState = vertex1.restoreFromEvent(new 
VertexInitializedEvent(vertex1.getVertexId(),
-        "vertex1", initRequestedTime, initedTime, vertex1.getTotalTasks(), "", 
null));
-    assertEquals(VertexState.INITED, recoveredState);
-    
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.FAILED));
-    dispatcher.await();
-    assertEquals(VertexState.FAILED, vertex1.getState());
-    assertEquals(vertex1.numTasks, vertex1.failedTaskCount);
-    assertEquals(0, vertex1.completedTaskCount);
-    // recover its task
-    assertTaskRecoveredEventSent(vertex1);
-
-    // vertex3 is still in NEW, when the desiredState is
-    // Completed State, each vertex recovery by itself, not depend on its 
parent
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    assertEquals(VertexState.NEW, vertex3.getState());
-    // no VertexEvent pass to downstream vertex
-    assertEquals(0, vertexEventHandler.getEvents().size());
-  }
-
-  /**
-   * vertex1(New) -> StartRecoveryTransition(KILLED)
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_Desired_KILLED() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    VertexState recoveredState = vertex1.restoreFromEvent(new 
VertexInitializedEvent(vertex1.getVertexId(),
-        "vertex1", initRequestedTime, initedTime, vertex1.getTotalTasks(), "", 
null));
-    assertEquals(VertexState.INITED, recoveredState);
-    
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.KILLED));
-    dispatcher.await();
-    assertEquals(VertexState.KILLED, vertex1.getState());
-    assertEquals(vertex1.numTasks, vertex1.killedTaskCount);
-    assertEquals(0, vertex1.completedTaskCount);
-    // recover its task
-    assertTaskRecoveredEventSent(vertex1);
-
-    // vertex3 is still in NEW, when the desiredState is
-    // Completed State, each vertex recovery by itself, not depend on its 
parent
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    assertEquals(VertexState.NEW, vertex3.getState());
-    // no VertexEvent pass to downstream vertex
-    assertEquals(0, vertexEventHandler.getEvents().size());
-  }
-
-  /**
-   * vertex1(New) -> StartRecoveryTransition(ERROR)
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_Desired_ERROR() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    VertexState recoveredState = vertex1.restoreFromEvent(new 
VertexInitializedEvent(vertex1.getVertexId(),
-        "vertex1", initRequestedTime, initedTime, vertex1.getTotalTasks(), "", 
null));
-    assertEquals(VertexState.INITED, recoveredState);
-    
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.ERROR));
-    dispatcher.await();
-    assertEquals(VertexState.ERROR, vertex1.getState());
-    assertEquals(vertex1.numTasks, vertex1.failedTaskCount);
-    assertEquals(0, vertex1.completedTaskCount);
-    // recover its task
-    assertTaskRecoveredEventSent(vertex1);
-
-    // vertex3 is still in NEW, when the desiredState is
-    // Completed State, each vertex recovery by itself, not depend on its 
parent
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    assertEquals(VertexState.NEW, vertex3.getState());
-    // no VertexEvent pass to downstream vertex
-    assertEquals(0, vertexEventHandler.getEvents().size());
-  }
-
-  private TezEvent createTezEvent() {
-    return new 
TezEvent(InputDataInformationEvent.createWithSerializedPayload(0, 
ByteBuffer.allocate(0)),
-        new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", null,
-            null));
-  }
-
-  /**
-   * vertex1(New) -> restoreFromDataMovementEvent -> StartRecoveryTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_New_Desired_RUNNING() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    VertexState recoveredState =
-        vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
-            vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
-    assertEquals(VertexState.NEW, recoveredState);
-    assertEquals(1, vertex1.recoveredEvents.size());
-
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-
-    // InputDataInformationEvent is removed
-    assertEquals(0, vertex1.recoveredEvents.size());
-    // V_INIT and V_START is sent
-    assertEquals(VertexState.RUNNING, vertex1.getState());
-
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex1);
-
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    // wait for recovery of vertex2
-    assertEquals(VertexState.RECOVERING, vertex3.getState());
-    assertEquals(1, vertex3.numRecoveredSourceVertices);
-    assertEquals(1, vertex3.numInitedSourceVertices);
-    assertEquals(1, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-
-  }
-
-  private void assertTaskRecoveredEventSent(VertexImpl vertex) {
-    int sentNum = 0;
-    for (TaskEvent event : taskEventHandler.getEvents()) {
-      if (event.getType() == TaskEventType.T_RECOVER) {
-        TaskEventRecoverTask recoverEvent = (TaskEventRecoverTask)event;
-        if 
(recoverEvent.getTaskID().getVertexID().equals(vertex.getVertexId())){
-          sentNum++;
-        }
-      }
-    }
-    assertEquals("expect " + vertex.getTotalTasks()
-        + " TaskEventTaskRecover sent for vertex:" + vertex.getVertexId() +
-        "but actuall sent " + sentNum, vertex.getTotalTasks(), sentNum);
-  }
-
-  private void assertOutputCommitters(VertexImpl vertex){
-    assertTrue(vertex.getOutputCommitters() != null);
-    for (OutputCommitter c : vertex.getOutputCommitters().values()) {
-      CountingOutputCommitter committer = (CountingOutputCommitter) c;
-      assertEquals(0, committer.abortCounter);
-      assertEquals(0, committer.commitCounter);
-      assertEquals(1, committer.initCounter);
-      assertEquals(1, committer.setupCounter);
-    }
-  }
-
-  private void restoreFromInitializedEvent(VertexImpl vertex) {
-    long initTimeRequested = 100L;
-    long initedTime = initTimeRequested + 100L;
-    VertexState recoveredState =
-        vertex.restoreFromEvent(new VertexInitializedEvent(vertex
-            .getVertexId(), "vertex1", initTimeRequested, initedTime, 
vertex.getTotalTasks(),
-            "", null));
-    assertEquals(VertexState.INITED, recoveredState);
-    assertEquals(vertex.getTotalTasks(), vertex.getTasks().size());
-    assertEquals(initTimeRequested, vertex.initTimeRequested);
-    assertEquals(initedTime, vertex.initedTime);
-  }
-
-  /**
-   * restoreFromVertexInitializedEvent -> StartRecoveryTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_Inited_Desired_RUNNING() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    restoreFromInitializedEvent(vertex1);
-
-    VertexState recoveredState =
-        vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
-            vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
-    assertEquals(VertexState.INITED, recoveredState);
-
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-
-    // InputDataInformationEvent is removed
-    assertEquals(0, vertex1.recoveredEvents.size());
-    assertEquals(VertexState.RUNNING, vertex1.getState());
-    // task recovered event is sent
-    assertTaskRecoveredEventSent(vertex1);
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex1);
-
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    // wait for recovery of vertex2
-    assertEquals(VertexState.RECOVERING, vertex3.getState());
-    assertEquals(1, vertex3.numRecoveredSourceVertices);
-    assertEquals(1, vertex3.numInitedSourceVertices);
-    assertEquals(1, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-  }
-
-  /**
-   * restoreFromVertexInitializedEvent -> restoreFromVertexStartedEvent ->
-   * StartRecoveryTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_Started_Desired_RUNNING() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    restoreFromInitializedEvent(vertex1);
-
-    long startTimeRequested = initedTime + 100L;
-    long startedTime = startTimeRequested + 100L;
-    VertexState recoveredState =
-        vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(),
-            startTimeRequested, startedTime));
-    assertEquals(VertexState.RUNNING, recoveredState);
-    assertEquals(startTimeRequested, vertex1.startTimeRequested);
-    assertEquals(startedTime, vertex1.startedTime);
-
-    recoveredState =
-        vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
-            vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
-    assertEquals(VertexState.RUNNING, recoveredState);
-    assertEquals(1, vertex1.recoveredEvents.size());
-
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-
-    // InputDataInformationEvent is removed
-    assertEquals(0, vertex1.recoveredEvents.size());
-    assertEquals(VertexState.RUNNING, vertex1.getState());
-    // task recovered event is sent
-    assertTaskRecoveredEventSent(vertex1);
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex1);
-
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    // wait for recovery of vertex2
-    assertEquals(VertexState.RECOVERING, vertex3.getState());
-    assertEquals(1, vertex3.numRecoveredSourceVertices);
-    assertEquals(1, vertex3.numInitedSourceVertices);
-    assertEquals(1, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-  }
-
-  /**
-   * restoreFromVertexInitializedEvent -> restoreFromVertexStartedEvent ->
-   * restoreFromVertexFinishedEvent -> StartRecoveryTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_Finished_Desired_RUNNING() {
-    // v1: initFromInitializedEvent
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    restoreFromInitializedEvent(vertex1);
-
-    // v1: initFromStartedEvent
-    long startRequestedTime = initedTime + 100L;
-    long startTime = startRequestedTime + 100L;
-    VertexState recoveredState =
-        vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(),
-            startRequestedTime, startTime));
-    assertEquals(VertexState.RUNNING, recoveredState);
-
-    // v1: initFromFinishedEvent
-    long finishTime = startTime + 100L;
-    recoveredState =
-        vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
-            "vertex1", 1, initRequestedTime, initedTime, startRequestedTime,
-            startTime, finishTime, VertexState.SUCCEEDED, "",
-            new TezCounters(), new VertexStats(), null));
-    assertEquals(finishTime, vertex1.finishTime);
-    assertEquals(VertexState.SUCCEEDED, recoveredState);
-    assertEquals(false, vertex1.recoveryCommitInProgress);
-
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-
-    // InputDataInformationEvent is removed
-    assertEquals(0, vertex1.recoveredEvents.size());
-    assertEquals(VertexState.RUNNING, vertex1.getState());
-    // task recovered event is sent
-    assertTaskRecoveredEventSent(vertex1);
-
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex1);
-
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    // wait for recovery of vertex2
-    assertEquals(VertexState.RECOVERING, vertex3.getState());
-    assertEquals(1, vertex3.numRecoveredSourceVertices);
-    assertEquals(1, vertex3.numInitedSourceVertices);
-    assertEquals(1, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-  }
-
-  /**
-   * vertex1 (New) -> StartRecoveryTransition <br>
-   * vertex2 (New) -> StartRecoveryTransition <br>
-   * vertex3 (New) -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_RecoveringFromNew() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-    assertEquals(VertexState.RUNNING, vertex1.getState());
-    assertEquals(1, vertex1.getTasks().size());
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex1);
-
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    VertexState recoveredState =
-        vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
-            vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
-    assertEquals(VertexState.NEW, recoveredState);
-    assertEquals(1, vertex3.recoveredEvents.size());
-
-    // wait for recovery of vertex2
-    assertEquals(VertexState.RECOVERING, vertex3.getState());
-    assertEquals(1, vertex3.numRecoveredSourceVertices);
-    assertEquals(1, vertex3.numInitedSourceVertices);
-    assertEquals(1, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-
-    VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
-    vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-    assertEquals(VertexState.RUNNING, vertex2.getState());
-    // no OutputCommitter for vertex2
-    assertNull(vertex2.getOutputCommitters());
-
-    // v3 go to RUNNING because v1 and v2 both start
-    assertEquals(VertexState.RUNNING, vertex3.getState());
-    assertEquals(2, vertex3.numRecoveredSourceVertices);
-    assertEquals(2, vertex3.numInitedSourceVertices);
-    assertEquals(2, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-    // RootInputDataInformation is removed
-    assertEquals(0, vertex3.recoveredEvents.size());
-
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex3);
-
-  }
-  
-  /**
-   * vertex1 (New) -> StartRecoveryTransition <br>
-   * vertex2 (New) -> RecoveryTransition <br>
-   */
-  @Test
-  public void testMRDAG() {
-    DAGPlan dagPlan = createDAGPlanMR();
-    dag =
-        new DAGImpl(dagId, new Configuration(), dagPlan,
-            dispatcher.getEventHandler(), 
mock(TaskCommunicatorManagerInterface.class),
-            new Credentials(), new SystemClock(), user,
-            mock(TaskHeartbeatHandler.class), mockAppContext);
-    when(mockAppContext.getCurrentDAG()).thenReturn(dag);
-    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
-
-    VertexImpl vertex1 = (VertexImpl)dag.getVertex("vertex1");
-    VertexImpl vertex2 = (VertexImpl)dag.getVertex("vertex2");
-    assertEquals(VertexState.NEW, vertex1.getState());
-    assertEquals(VertexState.NEW, vertex1.getState());
-
-    // vertex1 handle RecoveryEvent at the state of NEW
-    // vertex 2 handle SourceVertexRecoveryEvent at the state of NEW
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-    assertEquals(VertexState.RUNNING, vertex1.getState());
-    assertEquals(1, vertex1.getTasks().size());
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex1);
-    assertEquals(VertexState.RUNNING, vertex2.getState());
-  }
-
-  @Test(timeout = 5000)
-  public void testRecovery_VertexManagerErrorOnRecovery() {
-    // In order to simulate the behavior that VertexManagerError happens in 
recovering stage, need to start the recovering from
-    // vertex and disable the the eventhandling of DAG (use mock here).
-    dispatcher = new DrainDispatcher();
-    dispatcher.register(DAGEventType.class, mock(EventHandler.class));
-    vertexEventHandler = new VertexEventHanlder();
-    dispatcher.register(VertexEventType.class, vertexEventHandler);
-    taskEventHandler = new TaskEventHandler();
-    dispatcher.register(TaskEventType.class, taskEventHandler);
-    dispatcher.register(TaskAttemptEventType.class,
-        new TaskAttemptEventHandler());
-    dispatcher.init(new Configuration());
-    dispatcher.start();
-    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
-    DAGPlan dagPlan = createDAGPlan();
-    dag =
-        new DAGImpl(dagId, new Configuration(), dagPlan,
-            dispatcher.getEventHandler(), 
mock(TaskCommunicatorManagerInterface.class),
-            new Credentials(), new SystemClock(), user,
-            mock(TaskHeartbeatHandler.class), mockAppContext);
-    when(mockAppContext.getCurrentDAG()).thenReturn(dag);
-    ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
-    doReturn(clusterInfo).when(mockAppContext).getClusterInfo();
-    dag.restoreFromEvent(new DAGInitializedEvent(dag.getID(), 0L, "user", 
"dagName", null));
-    dag.restoreFromEvent(new DAGStartedEvent(dag.getID(), 0L, "user", 
"dagName"));
-    LOG.info("finish setUp");
-
-    /////////////////// Start the recover ////////////////////////
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    restoreFromInitializedEvent(vertex1);
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-    assertEquals(VertexState.RUNNING, vertex1.getState());
-    assertEquals(vertex1.getTotalTasks(), vertex1.getTasks().size());
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex1);
-
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    VertexState recoveredState =
-        vertex3.restoreFromEvent(new VertexInitializedEvent(vertex3
-            .getVertexId(), "vertex3", initRequestedTime, initedTime, 0, "",
-            null));
-    assertEquals(VertexState.INITED, recoveredState);
-    // wait for recovery of vertex2
-    assertEquals(VertexState.RECOVERING, vertex3.getState());
-    vertex3.handle(new VertexEventManagerUserCodeError(vertex3.getVertexId(), 
-        new AMUserCodeException(Source.VertexManager, new 
TezUncheckedException("test"))));
-
-    assertEquals(1, vertex3.numRecoveredSourceVertices);
-    assertEquals(1, vertex3.numInitedSourceVertices);
-    assertEquals(1, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-    VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
-    restoreFromInitializedEvent(vertex2);
-    vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-    assertEquals(VertexState.RUNNING, vertex2.getState());
-
-    // v3 FAILED due to user code error
-    assertEquals(VertexState.FAILED, vertex3.getState());
-    Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, 
vertex3.getTerminationCause());
-    assertEquals(2, vertex3.numRecoveredSourceVertices);
-  }
-
-
-  /**
-   * vertex1 (New) -> restoreFromInitialized -> StartRecoveryTransition<br>
-   * vertex2 (New) -> restoreFromInitialized -> StartRecoveryTransition<br>
-   * vertex3 (New) -> restoreFromVertexInitedEvent -> RecoverTransition<br>
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_RecoveringFromInited() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    restoreFromInitializedEvent(vertex1);
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-    assertEquals(VertexState.RUNNING, vertex1.getState());
-    assertEquals(vertex1.getTotalTasks(), vertex1.getTasks().size());
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex1);
-
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    VertexState recoveredState =
-        vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
-            vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
-    assertEquals(VertexState.NEW, recoveredState);
-    assertEquals(1, vertex3.recoveredEvents.size());
-    recoveredState =
-        vertex3.restoreFromEvent(new VertexInitializedEvent(vertex3
-            .getVertexId(), "vertex3", initRequestedTime, initedTime, 2, "",
-            null));
-    assertEquals(VertexState.INITED, recoveredState);
-    // wait for recovery of vertex2
-    assertEquals(VertexState.RECOVERING, vertex3.getState());
-    assertEquals(1, vertex3.numRecoveredSourceVertices);
-    assertEquals(1, vertex3.numInitedSourceVertices);
-    assertEquals(1, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-
-    VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
-    restoreFromInitializedEvent(vertex2);
-    vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-    assertEquals(VertexState.RUNNING, vertex2.getState());
-
-    // v3 go to RUNNING because v1 and v2 both start
-    assertEquals(VertexState.RUNNING, vertex3.getState());
-    assertEquals(2, vertex3.numRecoveredSourceVertices);
-    // numInitedSourceVertices is wrong but doesn't matter because v3 has
-    // already initialized
-    assertEquals(2, vertex3.numInitedSourceVertices);
-    assertEquals(2, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-    // RootInputDataInformation is removed
-    assertEquals(0, vertex3.recoveredEvents.size());
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex3);
-    // 1 for vertex1, 2 for vertex2, the second 2 for vertex3
-    assertTaskRecoveredEventSent(vertex1);
-    assertTaskRecoveredEventSent(vertex2);
-    assertTaskRecoveredEventSent(vertex3);
-  }
-
-  /**
-   * vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
-   * StartRecoveryTransition <br>
-   * vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> 
StartRecoveryTransition <br>
-   * vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> 
RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_RecoveringFromRunning() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    restoreFromInitializedEvent(vertex1);
-    VertexState recoveredState = vertex1.restoreFromEvent(new 
VertexStartedEvent(vertex1.getVertexId(),
-        initRequestedTime + 100L, initRequestedTime + 200L));
-    assertEquals(VertexState.RUNNING, recoveredState);
-
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-    assertEquals(VertexState.RUNNING, vertex1.getState());
-    assertEquals(1, vertex1.getTasks().size());
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex1);
-
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    recoveredState =
-        vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
-            vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
-    assertEquals(VertexState.NEW, recoveredState);
-    assertEquals(1, vertex3.recoveredEvents.size());
-    recoveredState =
-        vertex3.restoreFromEvent(new VertexInitializedEvent(vertex3
-            .getVertexId(), "vertex3", initRequestedTime, initedTime, 
vertex3.getTotalTasks(), "",
-            null));
-    assertEquals(VertexState.INITED, recoveredState);
-    recoveredState =
-        vertex3.restoreFromEvent(new VertexStartedEvent(vertex3.getVertexId(),
-            initRequestedTime + 100L, initRequestedTime + 200L));
-    assertEquals(VertexState.RUNNING, recoveredState);
-    // wait for recovery of vertex2
-    assertEquals(VertexState.RECOVERING, vertex3.getState());
-    assertEquals(1, vertex3.numRecoveredSourceVertices);
-    assertEquals(1, vertex3.numInitedSourceVertices);
-    assertEquals(1, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-
-    VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
-    recoveredState = vertex2.restoreFromEvent(new 
VertexInitializedEvent(vertex2.getVertexId(),
-        "vertex2", initRequestedTime, initedTime, vertex2.getTotalTasks(), "", 
null));
-    assertEquals(VertexState.INITED, recoveredState);
-    recoveredState = vertex2.restoreFromEvent(new 
VertexStartedEvent(vertex2.getVertexId(),
-        initRequestedTime + 100L, initRequestedTime + 200L));
-    assertEquals(VertexState.RUNNING, recoveredState);
-    
-    vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-    assertEquals(VertexState.RUNNING, vertex2.getState());
-
-    // v3 go to RUNNING because v1 and v2 both start
-    assertEquals(VertexState.RUNNING, vertex3.getState());
-    assertEquals(2, vertex3.numRecoveredSourceVertices);
-    assertEquals(2, vertex3.numInitedSourceVertices);
-    assertEquals(2, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-    // RootInputDataInformation is removed
-    assertEquals(0, vertex3.recoveredEvents.size());
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex3);
-
-    assertTaskRecoveredEventSent(vertex1);
-    assertTaskRecoveredEventSent(vertex2);
-    assertTaskRecoveredEventSent(vertex3);
-  }
-
-  /**
-   * vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
-   * restoreFromVertexFinished -> StartRecoveryTransition<br>
-   * vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
-   * restoreFromVertexFinished -> StartRecoveryTransition<br>
-   * vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> 
RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_RecoveringFromSUCCEEDED() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    restoreFromInitializedEvent(vertex1);
-    VertexState recoveredState = vertex1.restoreFromEvent(new 
VertexStartedEvent(vertex1.getVertexId(),
-        initRequestedTime + 100L, initRequestedTime + 200L));
-    assertEquals(VertexState.RUNNING, recoveredState);
-
-    recoveredState = vertex1.restoreFromEvent(new 
VertexFinishedEvent(vertex1.getVertexId(),
-        "vertex1", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
-        initRequestedTime + 400L, initRequestedTime + 500L,
-        VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), 
null));
-    assertEquals(VertexState.SUCCEEDED, recoveredState);
-
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-    assertEquals(VertexState.RUNNING, vertex1.getState());
-    assertEquals(1, vertex1.getTasks().size());
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex1);
-
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    recoveredState =
-        vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
-            vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
-    assertEquals(VertexState.NEW, recoveredState);
-    assertEquals(1, vertex3.recoveredEvents.size());
-    restoreFromInitializedEvent(vertex3);
-    recoveredState =
-        vertex3.restoreFromEvent(new VertexStartedEvent(vertex3.getVertexId(),
-            initRequestedTime + 100L, initRequestedTime + 200L));
-    assertEquals(VertexState.RUNNING, recoveredState);
-    // wait for recovery of vertex2
-    assertEquals(VertexState.RECOVERING, vertex3.getState());
-    assertEquals(1, vertex3.numRecoveredSourceVertices);
-    assertEquals(1, vertex3.numInitedSourceVertices);
-    assertEquals(1, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-
-    VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
-    recoveredState = vertex2.restoreFromEvent(new 
VertexInitializedEvent(vertex2.getVertexId(),
-        "vertex2", initRequestedTime, initedTime, vertex2.getTotalTasks(), "", 
null));
-    assertEquals(VertexState.INITED, recoveredState);
-    recoveredState = vertex2.restoreFromEvent(new 
VertexStartedEvent(vertex2.getVertexId(),
-        initRequestedTime + 100L, initRequestedTime + 200L));
-    assertEquals(VertexState.RUNNING, recoveredState);
-    vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(),
-        VertexState.RUNNING));
-    dispatcher.await();
-    assertEquals(VertexState.RUNNING, vertex2.getState());
-
-    // v3 go to RUNNING because v1 and v2 both start
-    assertEquals(VertexState.RUNNING, vertex3.getState());
-    assertEquals(2, vertex3.numRecoveredSourceVertices);
-    assertEquals(2, vertex3.numInitedSourceVertices);
-    assertEquals(2, vertex3.numStartedSourceVertices);
-    assertEquals(1, vertex3.getDistanceFromRoot());
-    // RootInputDataInformation is removed
-    assertEquals(0, vertex3.recoveredEvents.size());
-    // verify OutputCommitter is initialized
-    assertOutputCommitters(vertex3);
-
-    assertTaskRecoveredEventSent(vertex1);
-    assertTaskRecoveredEventSent(vertex2);
-    assertTaskRecoveredEventSent(vertex3);
-  }
-
-  /**
-   * vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
-   * restoreFromVertexFinished (KILLED)
-   * vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
-   * restoreFromVertexFinished (KILLED)
-   * vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
-   * restoreFromVertexFinished (KILLED)
-  */
-  @Test(timeout = 5000)
-  public void testRecovery_KilledBeforeTaskStarted() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    restoreFromInitializedEvent(vertex1);
-    VertexState recoveredState = vertex1.restoreFromEvent(new 
VertexStartedEvent(vertex1.getVertexId(),
-        initRequestedTime + 100L, initRequestedTime + 200L));
-    recoveredState = vertex1.restoreFromEvent(new 
VertexFinishedEvent(vertex1.getVertexId(),
-        "vertex1", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
-        initRequestedTime + 400L, initRequestedTime + 500L,
-        VertexState.KILLED, "", new TezCounters(), new VertexStats(), null));
-    assertEquals(VertexState.KILLED, recoveredState);
-
-    VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
-    restoreFromInitializedEvent(vertex2);
-    recoveredState = vertex2.restoreFromEvent(new 
VertexStartedEvent(vertex2.getVertexId(),
-        initRequestedTime + 100L, initRequestedTime + 200L));
-    recoveredState = vertex2.restoreFromEvent(new 
VertexFinishedEvent(vertex1.getVertexId(),
-        "vertex2", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
-        initRequestedTime + 400L, initRequestedTime + 500L,
-        VertexState.KILLED, "", new TezCounters(), new VertexStats(), null));
-    assertEquals(VertexState.KILLED, recoveredState);
-
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    restoreFromInitializedEvent(vertex3);
-    recoveredState = vertex3.restoreFromEvent(new 
VertexStartedEvent(vertex3.getVertexId(),
-        initRequestedTime + 100L, initRequestedTime + 200L));
-    recoveredState = vertex3.restoreFromEvent(new 
VertexFinishedEvent(vertex3.getVertexId(),
-        "vertex3", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
-        initRequestedTime + 400L, initRequestedTime + 500L,
-        VertexState.KILLED, "", new TezCounters(), new VertexStats(), null));
-    assertEquals(VertexState.KILLED, recoveredState);
-
-    // start the recovering, send RecoverEvent to its root vertices (v1, v2)
-    dag.handle(new DAGEventRecoverEvent(dag.getID(), null));
-    dispatcher.await();
-    // recover v1 to KILLED directly and also its tasks are recovered to KILLED
-    assertEquals(VertexState.KILLED, vertex1.getState());
-    for (Task task : vertex1.tasks.values()) {
-      assertEquals(TaskState.KILLED, task.getState());
-    }
-    // recover v2 to KILLED directly and also its tasks are recovered to KILLED
-    assertEquals(VertexState.KILLED, vertex2.getState());
-    for (Task task : vertex2.tasks.values()) {
-      assertEquals(TaskState.KILLED, task.getState());
-    }
-    // recover v3 to KILLED directly and also its tasks are recovered to KILLED
-    assertEquals(VertexState.KILLED, vertex3.getState());
-    for (Task task : vertex3.tasks.values()) {
-      assertEquals(TaskState.KILLED, task.getState());
-    }
-  }
-
-  /**
-   * vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
-   * restoreFromVertexFinished (FAILED)
-   * vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted
-   * vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
-   * restoreFromVertexFinished (FAILED)
-  */
-  @Test(timeout = 5000)
-  public void testRecovery_FailedBeforeTaskStarted() {
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    restoreFromInitializedEvent(vertex1);
-    VertexState recoveredState = vertex1.restoreFromEvent(new 
VertexStartedEvent(vertex1.getVertexId(),
-        initRequestedTime + 100L, initRequestedTime + 200L));
-    recoveredState = vertex1.restoreFromEvent(new 
VertexFinishedEvent(vertex1.getVertexId(),
-        "vertex1", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
-        initRequestedTime + 400L, initRequestedTime + 500L,
-        VertexState.FAILED, "", new TezCounters(), new VertexStats(), null));
-    assertEquals(VertexState.FAILED, recoveredState);
-
-    VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
-    restoreFromInitializedEvent(vertex2);
-    recoveredState = vertex2.restoreFromEvent(new 
VertexStartedEvent(vertex2.getVertexId(),
-        initRequestedTime + 100L, initRequestedTime + 200L));
-    assertEquals(VertexState.RUNNING, recoveredState);
-
-    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
-    restoreFromInitializedEvent(vertex3);
-    recoveredState = vertex3.restoreFromEvent(new 
VertexStartedEvent(vertex3.getVertexId(),
-        initRequestedTime + 100L, initRequestedTime + 200L));
-    recoveredState = vertex3.restoreFromEvent(new 
VertexFinishedEvent(vertex3.getVertexId(),
-        "vertex3", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
-        initRequestedTime + 400L, initRequestedTime + 500L,
-        VertexState.FAILED, "", new TezCounters(), new VertexStats(), null));
-    assertEquals(VertexState.FAILED, recoveredState);
-
-    // start the recovering from DAG
-    dag.handle(new DAGEventRecoverEvent(dag.getID(), null));
-    dispatcher.await();
-    // recover v1 to KILLED directly and also its tasks are recovered to KILLED
-    assertEquals(VertexState.FAILED, vertex1.getState());
-    for (Task task : vertex1.tasks.values()) {
-      assertEquals(TaskState.FAILED, task.getState());
-    }
-    // recover v2 to KILLED finally due to v1/v3 failed will cause dag failed 
which result in
-    // dag kill all its vertices
-    assertEquals(VertexState.KILLED, vertex2.getState());
-
-    // recover v3 to KILLED directly and also its tasks are recovered to KILLED
-    assertEquals(VertexState.FAILED, vertex3.getState());
-    for (Task task : vertex3.tasks.values()) {
-      assertEquals(TaskState.FAILED, task.getState());
-    }
-    assertEquals(DAGState.FAILED, dag.getState());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 57a849b..14aed3d 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -61,24 +61,29 @@ import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.runtime.api.InputSpecUpdate;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+
 public class TestHistoryEventsProtoConversion {
 
   private static final Logger LOG = LoggerFactory.getLogger(
@@ -270,10 +275,12 @@ public class TestHistoryEventsProtoConversion {
   }
 
   private void testVertexInitializedEvent() throws Exception {
+    List<TezEvent> initGeneratedEvents = Lists.newArrayList(
+        new TezEvent(InputDataInformationEvent.createWithSerializedPayload(0, 
ByteBuffer.wrap(new byte[0])), null));
     VertexInitializedEvent event = new VertexInitializedEvent(
         TezVertexID.getInstance(
             TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
-        "vertex1", 1000l, 15000l, 100, "procName", null);
+        "vertex1", 1000l, 15000l, 100, "procName", null, initGeneratedEvents);
     VertexInitializedEvent deserializedEvent = (VertexInitializedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
@@ -286,6 +293,11 @@ public class TestHistoryEventsProtoConversion {
     Assert.assertEquals(event.getAdditionalInputs(),
         deserializedEvent.getAdditionalInputs());
     Assert.assertNull(deserializedEvent.getProcessorName());
+    Assert.assertEquals(1, event.getInitGeneratedEvents().size());
+    Assert.assertEquals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT,
+        event.getInitGeneratedEvents().get(0).getEventType());
+    Assert.assertEquals(event.getInitGeneratedEvents().size(),
+        deserializedEvent.getInitGeneratedEvents().size());
     logEvents(event, deserializedEvent);
   }
 
@@ -304,85 +316,72 @@ public class TestHistoryEventsProtoConversion {
     logEvents(event, deserializedEvent);
   }
 
-  private void testVertexParallelismUpdatedEvent() throws Exception {
-    {
-      InputSpecUpdate rootInputSpecUpdateBulk = InputSpecUpdate
-          .createAllTaskInputSpecUpdate(2);
-      InputSpecUpdate rootInputSpecUpdatePerTask = InputSpecUpdate
-          .createPerTaskInputSpecUpdate(Lists.newArrayList(1, 2, 3));
-      Map<String, InputSpecUpdate> rootInputSpecUpdates = new HashMap<String, 
InputSpecUpdate>();
-      rootInputSpecUpdates.put("input1", rootInputSpecUpdateBulk);
-      rootInputSpecUpdates.put("input2", rootInputSpecUpdatePerTask);
-      VertexParallelismUpdatedEvent event =
-          new VertexParallelismUpdatedEvent(
-              TezVertexID.getInstance(
-                  TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 
111),
-              100, null, null, rootInputSpecUpdates, 1);
-      VertexParallelismUpdatedEvent deserializedEvent = 
(VertexParallelismUpdatedEvent)
+  private void testVertexReconfigureDoneEvent() throws Exception {
+    VertexLocationHint vertexLocationHint = VertexLocationHint.create(new 
ArrayList<TaskLocationHint>());
+    InputSpecUpdate rootInputSpecUpdateBulk = InputSpecUpdate
+        .createAllTaskInputSpecUpdate(2);
+    InputSpecUpdate rootInputSpecUpdatePerTask = InputSpecUpdate
+        .createPerTaskInputSpecUpdate(Lists.newArrayList(1, 2, 3));
+    Map<String, InputSpecUpdate> rootInputSpecUpdates = new HashMap<String, 
InputSpecUpdate>();
+    rootInputSpecUpdates.put("input1", rootInputSpecUpdateBulk);
+    rootInputSpecUpdates.put("input2", rootInputSpecUpdatePerTask);
+    
+    Map<String, EdgeProperty> sourceEdgeManagers
+      = new HashMap<String, EdgeProperty>();
+    // add standard and custom edge
+    sourceEdgeManagers.put("foo", 
EdgeProperty.create(DataMovementType.SCATTER_GATHER, 
+        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+        OutputDescriptor.create("Out1"), InputDescriptor.create("in1")));
+    sourceEdgeManagers.put("foo1", 
EdgeProperty.create(EdgeManagerPluginDescriptor.create("bar1")
+        .setUserPayload(
+            UserPayload.create(ByteBuffer.wrap(new 
String("payload").getBytes()), 100)), 
+        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+        OutputDescriptor.create("Out1"), InputDescriptor.create("in1")));
+
+    VertexConfigurationDoneEvent event =
+        new VertexConfigurationDoneEvent(
+            TezVertexID.getInstance(
+                TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+            100, 2, vertexLocationHint, sourceEdgeManagers, 
rootInputSpecUpdates, true);
+    VertexConfigurationDoneEvent deserializedEvent = 
(VertexConfigurationDoneEvent)
           testProtoConversion(event);
-      Assert.assertEquals(event.getVertexID(), 
deserializedEvent.getVertexID());
-      Assert.assertEquals(event.getNumTasks(), 
deserializedEvent.getNumTasks());
-      Assert.assertEquals(event.getSourceEdgeProperties(),
-          deserializedEvent.getSourceEdgeProperties());
-      Assert.assertEquals(event.getVertexLocationHint(),
-          deserializedEvent.getVertexLocationHint());
-      Assert.assertEquals(event.getRootInputSpecUpdates().size(), 
deserializedEvent
-          .getRootInputSpecUpdates().size());
-      InputSpecUpdate deserializedBulk = 
deserializedEvent.getRootInputSpecUpdates().get("input1");
-      InputSpecUpdate deserializedPerTask = 
deserializedEvent.getRootInputSpecUpdates().get("input2");
-      Assert.assertEquals(rootInputSpecUpdateBulk.isForAllWorkUnits(),
-          deserializedBulk.isForAllWorkUnits());
-      Assert.assertEquals(rootInputSpecUpdateBulk.getAllNumPhysicalInputs(),
-          deserializedBulk.getAllNumPhysicalInputs());
-      Assert.assertEquals(rootInputSpecUpdatePerTask.isForAllWorkUnits(),
-          deserializedPerTask.isForAllWorkUnits());
-      Assert.assertEquals(rootInputSpecUpdatePerTask.getAllNumPhysicalInputs(),
-          deserializedPerTask.getAllNumPhysicalInputs());
-      logEvents(event, deserializedEvent);
-    }
-    {
-      Map<String, EdgeProperty> sourceEdgeManagers
-          = new LinkedHashMap<String, EdgeProperty>();
-      // add standard and custom edge
-      sourceEdgeManagers.put("foo", 
EdgeProperty.create(DataMovementType.SCATTER_GATHER, 
-          DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-          OutputDescriptor.create("Out1"), InputDescriptor.create("in1")));
-      sourceEdgeManagers.put("foo1", 
EdgeProperty.create(EdgeManagerPluginDescriptor.create("bar1")
-          .setUserPayload(
-              UserPayload.create(ByteBuffer.wrap(new 
String("payload").getBytes()), 100)), 
-          DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-          OutputDescriptor.create("Out1"), InputDescriptor.create("in1")));
-      VertexParallelismUpdatedEvent event =
-          new VertexParallelismUpdatedEvent(
-              TezVertexID.getInstance(
-                  TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 
111),
-              100, 
VertexLocationHint.create(Arrays.asList(TaskLocationHint.createTaskLocationHint(
-              new HashSet<String>(Arrays.asList("h1")),
-              new HashSet<String>(Arrays.asList("r1"))))),
-              sourceEdgeManagers, null, 1);
-
-      VertexParallelismUpdatedEvent deserializedEvent = 
-          (VertexParallelismUpdatedEvent) testProtoConversion(event);
-      Assert.assertEquals(event.getVertexID(), 
deserializedEvent.getVertexID());
-      Assert.assertEquals(event.getNumTasks(), 
deserializedEvent.getNumTasks());
-      Assert.assertEquals(event.getSourceEdgeProperties().size(), 
deserializedEvent
-          .getSourceEdgeProperties().size());
-      
Assert.assertEquals(event.getSourceEdgeProperties().get("foo").getDataMovementType(),
-          
deserializedEvent.getSourceEdgeProperties().get("foo").getDataMovementType());
-      Assert.assertNull(deserializedEvent.getSourceEdgeProperties().get("foo")
-          .getEdgeManagerDescriptor());
-      
Assert.assertEquals(event.getSourceEdgeProperties().get("foo1").getDataMovementType(),
-          
deserializedEvent.getSourceEdgeProperties().get("foo1").getDataMovementType());
-      
Assert.assertEquals(event.getSourceEdgeProperties().get("foo1").getEdgeManagerDescriptor()
-          .getUserPayload().getVersion(), 
deserializedEvent.getSourceEdgeProperties().get("foo1")
-          .getEdgeManagerDescriptor().getUserPayload().getVersion());
-      Assert.assertArrayEquals(event.getSourceEdgeProperties().get("foo1")
-          .getEdgeManagerDescriptor().getUserPayload().deepCopyAsArray(), 
deserializedEvent
-          
.getSourceEdgeProperties().get("foo1").getEdgeManagerDescriptor().getUserPayload()
-          .deepCopyAsArray());
-      Assert.assertEquals(event.getVertexLocationHint(), 
deserializedEvent.getVertexLocationHint());
-      logEvents(event, deserializedEvent);
-    }
+    Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+    Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks());
+    Assert.assertEquals(event.isSetParallelismCalled(), 
deserializedEvent.isSetParallelismCalled());
+    // vertexLocationHint
+    Assert.assertEquals(event.getVertexLocationHint(),
+        deserializedEvent.getVertexLocationHint());
+    // rootInputSpec
+    Assert.assertEquals(event.getRootInputSpecUpdates().size(), 
deserializedEvent
+        .getRootInputSpecUpdates().size());
+    InputSpecUpdate deserializedBulk = 
deserializedEvent.getRootInputSpecUpdates().get("input1");
+    InputSpecUpdate deserializedPerTask = 
deserializedEvent.getRootInputSpecUpdates().get("input2");
+    Assert.assertEquals(rootInputSpecUpdateBulk.isForAllWorkUnits(),
+        deserializedBulk.isForAllWorkUnits());
+    Assert.assertEquals(rootInputSpecUpdateBulk.getAllNumPhysicalInputs(),
+        deserializedBulk.getAllNumPhysicalInputs());
+    Assert.assertEquals(rootInputSpecUpdatePerTask.isForAllWorkUnits(),
+        deserializedPerTask.isForAllWorkUnits());
+    Assert.assertEquals(rootInputSpecUpdatePerTask.getAllNumPhysicalInputs(),
+        deserializedPerTask.getAllNumPhysicalInputs());
+    // sourceEdgeManager
+    Assert.assertEquals(event.getSourceEdgeProperties().size(), 
deserializedEvent
+        .getSourceEdgeProperties().size());
+    
Assert.assertEquals(event.getSourceEdgeProperties().get("foo").getDataMovementType(),
+        
deserializedEvent.getSourceEdgeProperties().get("foo").getDataMovementType());
+    Assert.assertNull(deserializedEvent.getSourceEdgeProperties().get("foo")
+        .getEdgeManagerDescriptor());
+    
Assert.assertEquals(event.getSourceEdgeProperties().get("foo1").getDataMovementType(),
+        
deserializedEvent.getSourceEdgeProperties().get("foo1").getDataMovementType());
+    
Assert.assertEquals(event.getSourceEdgeProperties().get("foo1").getEdgeManagerDescriptor()
+        .getUserPayload().getVersion(), 
deserializedEvent.getSourceEdgeProperties().get("foo1")
+        .getEdgeManagerDescriptor().getUserPayload().getVersion());
+    Assert.assertArrayEquals(event.getSourceEdgeProperties().get("foo1")
+        .getEdgeManagerDescriptor().getUserPayload().deepCopyAsArray(), 
deserializedEvent
+        
.getSourceEdgeProperties().get("foo1").getEdgeManagerDescriptor().getUserPayload()
+        .deepCopyAsArray());
+
+    logEvents(event, deserializedEvent);
   }
 
   private void testVertexFinishedEvent() throws Exception {
@@ -500,7 +499,7 @@ public class TestHistoryEventsProtoConversion {
           
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 
1), 1),
           "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
-          null, null, null, null, 2048,
+          null, null, null, null, null, 2048,
           
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 
1), 0), 1024);
       TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
@@ -536,7 +535,8 @@ public class TestHistoryEventsProtoConversion {
           
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 
1), 1),
           "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
-          TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new 
TezCounters(), events, 0, null, 0);
+          TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new 
TezCounters(), events,
+          null, 0, null, 0);
       TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getTaskAttemptID(),
@@ -594,35 +594,6 @@ public class TestHistoryEventsProtoConversion {
     logEvents(event, deserializedEvent);
   }
 
-  private void testVertexDataMovementEventsGeneratedEvent() throws Exception {
-    VertexRecoverableEventsGeneratedEvent event;
-    try {
-      event = new VertexRecoverableEventsGeneratedEvent(
-          TezVertexID.getInstance(
-              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), 
null);
-      fail("Invalid creation should have errored out");
-    } catch (RuntimeException e) {
-      // Expected
-    }
-    long eventTime = 1024;
-    List<TezEvent> events =
-        Arrays.asList(new TezEvent(DataMovementEvent.create(1, null),
-            new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", 
null), eventTime));
-    event = new VertexRecoverableEventsGeneratedEvent(
-            TezVertexID.getInstance(
-                TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), 
events);
-    VertexRecoverableEventsGeneratedEvent deserializedEvent =
-        (VertexRecoverableEventsGeneratedEvent) testProtoConversion(event);
-    Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
-    Assert.assertEquals(1,
-        deserializedEvent.getTezEvents().size());
-    Assert.assertEquals(event.getTezEvents().get(0).getEventType(),
-        deserializedEvent.getTezEvents().get(0).getEventType());
-    Assert.assertEquals(event.getTezEvents().get(0).getEventReceivedTime(),
-        deserializedEvent.getTezEvents().get(0).getEventReceivedTime());
-    logEvents(event, deserializedEvent);
-  }
-
   private void testDAGCommitStartedEvent() throws Exception {
     DAGCommitStartedEvent event = new DAGCommitStartedEvent(
         TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100l);
@@ -643,15 +614,21 @@ public class TestHistoryEventsProtoConversion {
   }
 
   private void testVertexGroupCommitStartedEvent() throws Exception {
+    TezVertexID vertexId1 = TezVertexID.getInstance(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 0), 1);
+    TezVertexID vertexId2 = TezVertexID.getInstance(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 0), 2);
+    Collection<TezVertexID> vertexIds = Lists.newArrayList(vertexId1, 
vertexId2);
     VertexGroupCommitStartedEvent event = new VertexGroupCommitStartedEvent(
         TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1),
-        "fooGroup", 1000344l);
+        "fooGroup", vertexIds, 1000344l);
     {
       VertexGroupCommitStartedEvent deserializedEvent =
           (VertexGroupCommitStartedEvent) testProtoConversion(event);
       Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
       Assert.assertEquals(event.getVertexGroupName(),
           deserializedEvent.getVertexGroupName());
+      Assert.assertEquals(event.getVertexIds(), vertexIds);
       logEvents(event, deserializedEvent);
     }
     {
@@ -664,15 +641,21 @@ public class TestHistoryEventsProtoConversion {
   }
 
   private void testVertexGroupCommitFinishedEvent() throws Exception {
+    TezVertexID vertexId1 = TezVertexID.getInstance(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 0), 1);
+    TezVertexID vertexId2 = TezVertexID.getInstance(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 0), 2);
+    Collection<TezVertexID> vertexIds = Lists.newArrayList(vertexId1, 
vertexId2);
     VertexGroupCommitFinishedEvent event = new VertexGroupCommitFinishedEvent(
         TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1),
-        "fooGroup", 1000344l);
+        "fooGroup", vertexIds, 1000344l);
     {
       VertexGroupCommitFinishedEvent deserializedEvent =
           (VertexGroupCommitFinishedEvent) testProtoConversion(event);
       Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
       Assert.assertEquals(event.getVertexGroupName(),
           deserializedEvent.getVertexGroupName());
+      Assert.assertEquals(event.getVertexIds(), vertexIds);
       logEvents(event, deserializedEvent);
     }
     {
@@ -685,7 +668,7 @@ public class TestHistoryEventsProtoConversion {
   }
 
 
-  @Test(timeout = 5000)
+  @Test//(timeout = 5000)
   public void testDefaultProtoConversion() throws Exception {
     for (HistoryEventType eventType : HistoryEventType.values()) {
       switch (eventType) {
@@ -716,8 +699,8 @@ public class TestHistoryEventsProtoConversion {
         case VERTEX_STARTED:
           testVertexStartedEvent();
           break;
-        case VERTEX_PARALLELISM_UPDATED:
-          testVertexParallelismUpdatedEvent();
+        case VERTEX_CONFIGURE_DONE:
+          testVertexReconfigureDoneEvent();
           break;
         case VERTEX_FINISHED:
           testVertexFinishedEvent();
@@ -740,9 +723,6 @@ public class TestHistoryEventsProtoConversion {
         case CONTAINER_STOPPED:
           testContainerStoppedEvent();
           break;
-        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-          testVertexDataMovementEventsGeneratedEvent();
-          break;
         case DAG_COMMIT_STARTED:
           testDAGCommitStartedEvent();
           break;

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index bcc3859..606fb85 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -59,12 +59,11 @@ import 
org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@@ -139,13 +138,13 @@ public class TestHistoryEventJsonConversion {
           break;
         case VERTEX_INITIALIZED:
           event = new VertexInitializedEvent(tezVertexID, "v1", 
random.nextInt(), random.nextInt(),
-              random.nextInt(), "proc", null);
+              random.nextInt(), "proc", null, null);
           break;
         case VERTEX_STARTED:
           event = new VertexStartedEvent(tezVertexID, random.nextInt(), 
random.nextInt());
           break;
-        case VERTEX_PARALLELISM_UPDATED:
-          event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, 
null, null, 10);
+        case VERTEX_CONFIGURE_DONE:
+          event = new VertexConfigurationDoneEvent(tezVertexID, 0L, 1, null, 
null, null, true);
           break;
         case VERTEX_FINISHED:
           event = new VertexFinishedEvent(tezVertexID, "v1", 1, 
random.nextInt(), random.nextInt(),
@@ -165,7 +164,8 @@ public class TestHistoryEventJsonConversion {
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", 
random.nextInt(),
-              random.nextInt(), TaskAttemptState.KILLED, 
TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, null, 0, null, 0);
+              random.nextInt(), TaskAttemptState.KILLED, 
TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,
+              null, null, null, null, 0, null, 0);
           break;
         case CONTAINER_LAUNCHED:
           event = new ContainerLaunchedEvent(containerId, random.nextInt(),
@@ -174,9 +174,6 @@ public class TestHistoryEventJsonConversion {
         case CONTAINER_STOPPED:
           event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, 
applicationAttemptId);
           break;
-        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-          event = new VertexRecoverableEventsGeneratedEvent();
-          break;
         case DAG_COMMIT_STARTED:
           event = new DAGCommitStartedEvent();
           break;
@@ -207,7 +204,7 @@ public class TestHistoryEventJsonConversion {
   }
 
   @Test(timeout = 5000)
-  public void testConvertVertexParallelismUpdatedEvent() throws JSONException {
+  public void testConvertVertexReconfigureDoneEvent() throws JSONException {
     TezVertexID vId = TezVertexID.getInstance(
         TezDAGID.getInstance(
             ApplicationId.newInstance(1l, 1), 1), 1);
@@ -217,8 +214,8 @@ public class TestHistoryEventJsonConversion {
     edgeMgrs.put("a", 
EdgeProperty.create(EdgeManagerPluginDescriptor.create("a.class")
         .setHistoryText("text"), DataSourceType.PERSISTED, 
SchedulingType.SEQUENTIAL,
         OutputDescriptor.create("Out"), InputDescriptor.create("In")));
-    VertexParallelismUpdatedEvent event = new 
VertexParallelismUpdatedEvent(vId, 1, null,
-        edgeMgrs, null, 10);
+    VertexConfigurationDoneEvent event = new VertexConfigurationDoneEvent(vId, 
0L, 1, null,
+        edgeMgrs, null, true);
 
     JSONObject jsonObject = HistoryEventJsonConversion.convertToJson(event);
     Assert.assertNotNull(jsonObject);
@@ -229,12 +226,11 @@ public class TestHistoryEventJsonConversion {
     Assert.assertEquals(1, events.length());
 
     JSONObject evt = events.getJSONObject(0);
-    Assert.assertEquals(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name(),
+    Assert.assertEquals(HistoryEventType.VERTEX_CONFIGURE_DONE.name(),
         evt.getString(ATSConstants.EVENT_TYPE));
 
     JSONObject evtInfo = evt.getJSONObject(ATSConstants.EVENT_INFO);
     Assert.assertEquals(1, evtInfo.getInt(ATSConstants.NUM_TASKS));
-    Assert.assertEquals(10, evtInfo.getInt(ATSConstants.OLD_NUM_TASKS));
     
Assert.assertNotNull(evtInfo.getJSONObject(ATSConstants.UPDATED_EDGE_MANAGERS));
 
     JSONObject updatedEdgeMgrs = 
evtInfo.getJSONObject(ATSConstants.UPDATED_EDGE_MANAGERS);
@@ -247,9 +243,6 @@ public class TestHistoryEventJsonConversion {
     Assert.assertEquals("In", 
updatedEdgeMgr.getString(DAGUtils.EDGE_DESTINATION_CLASS_KEY));
     Assert.assertEquals("a.class", 
updatedEdgeMgr.getString(DAGUtils.EDGE_MANAGER_CLASS_KEY));
 
-    JSONObject otherInfo = jsonObject.getJSONObject(ATSConstants.OTHER_INFO);
-    Assert.assertEquals(1, otherInfo.getInt(ATSConstants.NUM_TASKS));
-
   }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git 
a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java 
b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index 5922100..281eaa9 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 
 import org.apache.commons.cli.Options;
@@ -32,6 +33,7 @@ import org.apache.tez.client.CallerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -263,4 +265,14 @@ public abstract class TezExampleBase extends Configured 
implements Tool {
    */
   protected abstract int runJob(String[] args, TezConfiguration tezConf,
                                 TezClient tezClient) throws Exception;
+  
+  @Private
+  @VisibleForTesting
+  public ApplicationId getAppId() {
+    if (tezClientInternal == null) {
+      LOG.warn("TezClient is not initialized, return null for AppId");
+      return null;
+    }
+    return tezClientInternal.getAppMasterApplicationId();
+  }
 }

Reply via email to