This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new e7c24f0  TEZ-4140: Tez DAG Recovery: Discrepancy In Scheduling 
Vertices During Vertex Recovery (Syed Shameerur Rahman via László Bodor)
e7c24f0 is described below

commit e7c24f06e220cb707f114b4f5cc7210d27cce72d
Author: Syed Shameerur Rahman <[email protected]>
AuthorDate: Sun Apr 19 10:12:09 2020 +0200

    TEZ-4140: Tez DAG Recovery: Discrepancy In Scheduling Vertices During 
Vertex Recovery (Syed Shameerur Rahman via László Bodor)
    
    Signed-off-by: Laszlo Bodor <[email protected]>
---
 .../org/apache/tez/dag/app/RecoveryParser.java     |   5 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |  22 +-
 .../tez/dag/app/dag/impl/TestDAGRecovery.java      | 259 +++++++++++++++++++--
 3 files changed, 268 insertions(+), 18 deletions(-)

diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index bab6142..dfb7f61 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -74,7 +74,6 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
-import org.apache.tez.runtime.api.impl.TezEvent;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.tez.common.Preconditions;
@@ -971,6 +970,10 @@ public class RecoveryParser {
       return vertexInitedEvent != null && vertexConfigurationDoneEvent != null;
     }
 
+    public boolean isVertexTasksStarted() {
+      return taskRecoveryDataMap != null && !taskRecoveryDataMap.isEmpty();
+    }
+
     public boolean isVertexStarted() {
       return vertexStartedEvent != null;
     }
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f0a8642..b67809e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -231,6 +231,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
   private final AppContext appContext;
   private final DAG dag;
   private final VertexRecoveryData recoveryData;
+  private boolean isVertexInitSkipped = false;
   private List<TezEvent> initGeneratedEvents = new ArrayList<TezEvent>();
   // set it to be true when setParallelism is called(used for recovery) 
   private boolean setParallelismCalledFlag = false;
@@ -2804,6 +2805,15 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     return VertexState.INITED;
   }
 
+  private boolean isVertexInitSkippedInParentVertices() {
+    for (Map.Entry<Vertex, Edge> entry : sourceVertices.entrySet()) {
+      if(!(((VertexImpl) entry.getKey()).isVertexInitSkipped())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   private void assignVertexManager() throws TezException {
     // condition for skip initializing stage
     //   - VertexInputInitializerEvent is seen
@@ -2816,8 +2826,10 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     //        -  Why using VertexReconfigureDoneEvent
     //           -  VertexReconfigureDoneEvent represent the case that user 
use API reconfigureVertex
     //              VertexReconfigureDoneEvent will be logged
-    if (recoveryData != null
-        && recoveryData.shouldSkipInit()) {
+    //   - TaskStartEvent is seen in that vertex
+    //   - All the parent vertices have skipped initializing stage while 
recovering
+    if (recoveryData != null && recoveryData.shouldSkipInit()
+        && recoveryData.isVertexTasksStarted() && 
isVertexInitSkippedInParentVertices()) {
       // Replace the original VertexManager with NoOpVertexManager if the 
reconfiguration is done in the last AM attempt
       VertexConfigurationDoneEvent reconfigureDoneEvent = 
recoveryData.getVertexConfigurationDoneEvent();
       if (LOG.isInfoEnabled()) {
@@ -2837,6 +2849,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
           
VertexManagerPluginDescriptor.create(NoOpVertexManager.class.getName())
             
.setUserPayload(UserPayload.create(ByteBuffer.wrap(out.toByteArray()))),
           dagUgi, this, appContext, stateChangeNotifier);
+      isVertexInitSkipped = true;
       return;
     }
 
@@ -4666,6 +4679,11 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     return this.vertexManager;
   }
 
+  public boolean isVertexInitSkipped() {
+    return isVertexInitSkipped;
+  }
+
+
   private static void logLocationHints(String vertexName,
       VertexLocationHint locationHint) {
     if (locationHint == null) {
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 260bd42..fcf6db8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -176,14 +176,23 @@ public class TestDAGRecovery {
   private TezVertexID v2Id;
   private TezTaskID t1v2Id;
   private TezTaskAttemptID ta1t1v2Id;
+  private TezVertexID v3Id;
+  private TezTaskID t1v3Id;
+  private TezTaskAttemptID ta1t1v3Id;
 
   ////////////////////////
   private Random rand = new Random();
   private long dagInitedTime = System.currentTimeMillis() + rand.nextInt(100);
   private long dagStartedTime = dagInitedTime + rand.nextInt(100);
   private long v1InitedTime = dagStartedTime + rand.nextInt(100);
+  private long v2InitedTime = dagStartedTime + rand.nextInt(100);
+  private long v3InitedTime = Math.max(v1InitedTime, v2InitedTime) + 
rand.nextInt(100);
   private long v1StartedTime = v1InitedTime + rand.nextInt(100);
+  private long v2StartedTime = v2InitedTime + rand.nextInt(100);
+  private long v3StartedTime = v3InitedTime + rand.nextInt(100);
   private int v1NumTask = 10;
+  private int v2NumTask = 5;
+  private int v3NumTask = 2;
   private long t1StartedTime = v1StartedTime + rand.nextInt(100);
   private long t1FinishedTime = t1StartedTime + rand.nextInt(100);
   private long ta1LaunchTime = t1StartedTime + rand.nextInt(100);
@@ -354,6 +363,9 @@ public class TestDAGRecovery {
     v2Id = TezVertexID.getInstance(dagId, 1);
     t1v2Id = TezTaskID.getInstance(v2Id, 0);
     ta1t1v2Id = TezTaskAttemptID.getInstance(t1v2Id, 0);
+    v3Id = TezVertexID.getInstance(dagId, 2);
+    t1v3Id = TezTaskID.getInstance(v3Id, 0);
+    ta1t1v3Id = TezTaskAttemptID.getInstance(t1v3Id, 0);
 
     dispatcher.register(CallableEventType.class, new 
CallableEventDispatcher());
     taskEventDispatcher = new TaskEventDispatcher();
@@ -724,7 +736,7 @@ public class TestDAGRecovery {
    *  DAG:  DAGInitedEvent -> DAGStartedEvent 
    *  V1:   VertexReconfigrationDoneEvent -> VertexInitializedEvent
    * 
-   * V1 skip initialization. 
+   * Reinitialize V1 again.
    */
   @Test//(timeout=5000)
   public void testVertexRecoverFromInitedAndReconfigureDone() {
@@ -751,35 +763,75 @@ public class TestDAGRecovery {
     VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
     VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
     assertEquals(DAGState.RUNNING, dag.getState());
-    // v1 skip initialization
-    assertEquals(VertexState.RUNNING, v1.getState());
-    assertEquals(v1InitedTime, v1.initedTime);
-    assertEquals(v1NumTask, v1.getTotalTasks());
+    // reinitialize v1
+    assertEquals(VertexState.INITIALIZING, v1.getState());
     assertEquals(VertexState.RUNNING, v2.getState());
-    assertEquals(VertexState.RUNNING, v3.getState());
+    assertEquals(VertexState.INITED, v3.getState());
   }
-  
+
   /**
    * RecoveryEvents:
    *  DAG:  DAGInitedEvent -> DAGStartedEvent 
    *  V1:   VertexReconfigrationDoneEvent -> VertexInitializedEvent -> 
VertexStartedEvent
-   * 
-   * V1 skip initialization. 
+   *
+   * Reinitialize V1 again.
    */
   @Test(timeout=5000)
   public void testVertexRecoverFromStart() {
-    initMockDAGRecoveryDataForVertex();   
+    initMockDAGRecoveryDataForVertex();
     List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
-    VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, 
-        "vertex1", 0L, v1InitedTime, 
+    VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+        "vertex1", 0L, v1InitedTime,
         v1NumTask, "", null, inputGeneratedTezEvents, null);
-    VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new 
VertexConfigurationDoneEvent(v1Id, 
+    VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new 
VertexConfigurationDoneEvent(v1Id,
         0L, v1NumTask, null, null, null, true);
     VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, 
v1StartedTime);
     VertexRecoveryData vertexRecoveryData = new 
VertexRecoveryData(v1InitedEvent,
         v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap<TezTaskID, 
TaskRecoveryData>(), false);
     
doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
-    
+
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, 
dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
+
+    VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+    assertEquals(DAGState.RUNNING, dag.getState());
+    // reinitialize v1
+    assertEquals(VertexState.INITIALIZING, v1.getState());
+    assertEquals(VertexState.RUNNING, v2.getState());
+    assertEquals(VertexState.INITED, v3.getState());
+  }
+
+  /**
+   * RecoveryEvents:
+   *  DAG:  DAGInitedEvent -> DAGStartedEvent
+   *  V1:   VertexReconfigrationDoneEvent -> VertexInitializedEvent -> 
VertexStartedEvent -> VertexTaskStart
+   *
+   * V1 skip initialization.
+   */
+  @Test(timeout=5000)
+  public void testVertexRecoverFromVertexTaskStart() {
+    initMockDAGRecoveryDataForVertex();
+    List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+    VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+        "vertex1", 0L, v1InitedTime,
+        v1NumTask, "", null, inputGeneratedTezEvents, null);
+    VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new 
VertexConfigurationDoneEvent(v1Id,
+        0L, v1NumTask, null, null, null, true);
+    VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, 
v1StartedTime);
+
+    TaskStartedEvent taskStartedEvent = new TaskStartedEvent(t1v1Id, "v1", 0L, 
0L);
+    TaskRecoveryData taskRecoveryData = new TaskRecoveryData(taskStartedEvent, 
null, null);
+    Map<TezTaskID, TaskRecoveryData> taskRecoveryDataMap = new HashMap<>();
+    // put dummy tasks
+    taskRecoveryDataMap.put(t1v2Id, taskRecoveryData);
+
+    VertexRecoveryData vertexRecoveryData = new 
VertexRecoveryData(v1InitedEvent,
+        v1ReconfigureDoneEvent, v1StartedEvent, null, taskRecoveryDataMap, 
false);
+    
doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+
     DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, 
dagRecoveryData);
     dag.handle(recoveryEvent);
     dispatcher.await();
@@ -796,6 +848,178 @@ public class TestDAGRecovery {
     assertEquals(VertexState.RUNNING, v2.getState());
     assertEquals(VertexState.RUNNING, v3.getState());
   }
+
+  /**
+   * RecoveryEvents:
+   *  DAG:  DAGInitedEvent -> DAGStartedEvent
+   *  V1:   VertexReconfigrationDoneEvent -> VertexInitializedEvent -> 
VertexStartedEvent -> VertexTaskStart
+   *  V2:   VertexReconfigrationDoneEvent -> VertexInitializedEvent -> 
VertexStartedEvent -> VertexTaskStart
+   *  V3:   VertexReconfigrationDoneEvent -> VertexInitializedEvent -> 
VertexStartedEvent -> VertexTaskStart
+   *
+   * V1 skip initialization.
+   * V2 skip initialization.
+   * V3 skip initialization.
+   */
+  @Test(timeout=5000)
+  public void testMultipleVertexRecoverFromVertexTaskStart() {
+    initMockDAGRecoveryDataForVertex();
+    List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+    VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+        "vertex1", 0L, v1InitedTime,
+        v1NumTask, "", null, inputGeneratedTezEvents, null);
+    VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id,
+        "vertex2", 0L, v2InitedTime,
+        v2NumTask, "", null, inputGeneratedTezEvents, null);
+    VertexInitializedEvent v3InitedEvent = new VertexInitializedEvent(v3Id,
+        "vertex3", 0L, v3InitedTime,
+        v3NumTask, "", null, inputGeneratedTezEvents, null);
+
+    VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new 
VertexConfigurationDoneEvent(v1Id,
+        0L, v1NumTask, null, null, null, true);
+    VertexConfigurationDoneEvent v2ReconfigureDoneEvent = new 
VertexConfigurationDoneEvent(v2Id,
+        0L, v2NumTask, null, null, null, true);
+    VertexConfigurationDoneEvent v3ReconfigureDoneEvent = new 
VertexConfigurationDoneEvent(v3Id,
+        0L, v3NumTask, null, null, null, true);
+
+    VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, 
v1StartedTime);
+    VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L, 
v2StartedTime);
+    VertexStartedEvent v3StartedEvent = new VertexStartedEvent(v3Id, 0L, 
v3StartedTime);
+
+    TaskStartedEvent v1taskStartedEvent = new TaskStartedEvent(t1v1Id, 
"vertex1", 0L, 0L);
+    TaskRecoveryData v1taskRecoveryData = new 
TaskRecoveryData(v1taskStartedEvent, null, null);
+    Map<TezTaskID, TaskRecoveryData> v1taskRecoveryDataMap = new HashMap<>();
+    // put dummy tasks
+    v1taskRecoveryDataMap.put(t1v1Id, v1taskRecoveryData);
+
+    TaskStartedEvent v2taskStartedEvent = new TaskStartedEvent(t1v2Id, 
"vertex2", 0L, 0L);
+    TaskRecoveryData v2taskRecoveryData = new 
TaskRecoveryData(v2taskStartedEvent, null, null);
+    Map<TezTaskID, TaskRecoveryData> v2taskRecoveryDataMap = new HashMap<>();
+    // put dummy tasks
+    v2taskRecoveryDataMap.put(t1v2Id, v2taskRecoveryData);
+
+    TaskStartedEvent v3taskStartedEvent = new TaskStartedEvent(t1v3Id, 
"vertex3", 0L, 0L);
+    TaskRecoveryData v3taskRecoveryData = new 
TaskRecoveryData(v3taskStartedEvent, null, null);
+    Map<TezTaskID, TaskRecoveryData> v3taskRecoveryDataMap = new HashMap<>();
+    // put dummy tasks
+    v3taskRecoveryDataMap.put(t1v3Id, v3taskRecoveryData);
+
+    VertexRecoveryData vertex1RecoveryData = new 
VertexRecoveryData(v1InitedEvent,
+        v1ReconfigureDoneEvent, v1StartedEvent, null, v1taskRecoveryDataMap, 
false);
+    VertexRecoveryData vertex2RecoveryData = new 
VertexRecoveryData(v2InitedEvent,
+        v2ReconfigureDoneEvent, v2StartedEvent, null, v2taskRecoveryDataMap, 
false);
+    VertexRecoveryData vertex3RecoveryData = new 
VertexRecoveryData(v3InitedEvent,
+        v3ReconfigureDoneEvent, v3StartedEvent, null, v3taskRecoveryDataMap, 
false);
+
+    
doReturn(vertex1RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+    
doReturn(vertex2RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v2Id);
+    
doReturn(vertex3RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v3Id);
+
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, 
dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
+
+    VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+
+    assertEquals(DAGState.RUNNING, dag.getState());
+
+    // v1 skip initialization
+    assertEquals(VertexState.RUNNING, v1.getState());
+    assertEquals(v1InitedTime, v1.initedTime);
+    assertEquals(v1StartedTime, v1.startedTime);
+    assertEquals(v1NumTask, v1.getTotalTasks());
+
+    // v2 skip initialization
+    assertEquals(VertexState.RUNNING, v2.getState());
+    assertEquals(v2InitedTime, v2.initedTime);
+    assertEquals(v2StartedTime, v2.startedTime);
+    assertEquals(v2NumTask, v2.getTotalTasks());
+
+    // v3 skip initialization
+    assertEquals(VertexState.RUNNING, v3.getState());
+    assertEquals(v3InitedTime, v3.initedTime);
+    assertEquals(v3StartedTime, v3.startedTime);
+    assertEquals(v3NumTask, v3.getTotalTasks());
+  }
+
+  /**
+   * RecoveryEvents:
+   *  DAG:  DAGInitedEvent -> DAGStartedEvent
+   *  V1:   VertexReconfigrationDoneEvent -> VertexInitializedEvent
+   *  V2:   VertexReconfigrationDoneEvent -> VertexInitializedEvent -> 
VertexStartedEvent -> VertexTaskStart
+   *  V3:   VertexReconfigrationDoneEvent -> VertexInitializedEvent -> 
VertexStartedEvent -> VertexTaskStart
+   *
+   *  Reinitialize V1 again.
+   *  V2 skip initialization.
+   *  Reinitialize V3 again. Since V3 is dependent on V1
+   */
+  @Test(timeout=5000)
+  public void testMultipleVertexRecoverFromVertex() {
+    initMockDAGRecoveryDataForVertex();
+    List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+    VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+        "vertex1", 0L, v1InitedTime,
+        v1NumTask, "", null, inputGeneratedTezEvents, null);
+    VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id,
+        "vertex2", 0L, v2InitedTime,
+        v2NumTask, "", null, inputGeneratedTezEvents, null);
+    VertexInitializedEvent v3InitedEvent = new VertexInitializedEvent(v3Id,
+        "vertex3", 0L, v3InitedTime,
+        v3NumTask, "", null, inputGeneratedTezEvents, null);
+
+    VertexConfigurationDoneEvent v2ReconfigureDoneEvent = new 
VertexConfigurationDoneEvent(v2Id,
+        0L, v2NumTask, null, null, null, true);
+    VertexConfigurationDoneEvent v3ReconfigureDoneEvent = new 
VertexConfigurationDoneEvent(v3Id,
+        0L, v3NumTask, null, null, null, true);
+
+    VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L, 
v2StartedTime);
+    VertexStartedEvent v3StartedEvent = new VertexStartedEvent(v3Id, 0L, 
v3StartedTime);
+
+    TaskStartedEvent v2taskStartedEvent = new TaskStartedEvent(t1v2Id, 
"vertex2", 0L, 0L);
+    TaskRecoveryData v2taskRecoveryData = new 
TaskRecoveryData(v2taskStartedEvent, null, null);
+    Map<TezTaskID, TaskRecoveryData> v2taskRecoveryDataMap = new HashMap<>();
+    // put dummy tasks
+    v2taskRecoveryDataMap.put(t1v2Id, v2taskRecoveryData);
+
+    TaskStartedEvent v3taskStartedEvent = new TaskStartedEvent(t1v3Id, 
"vertex3", 0L, 0L);
+    TaskRecoveryData v3taskRecoveryData = new 
TaskRecoveryData(v3taskStartedEvent, null, null);
+    Map<TezTaskID, TaskRecoveryData> v3taskRecoveryDataMap = new HashMap<>();
+    // put dummy tasks
+    v3taskRecoveryDataMap.put(t1v3Id, v3taskRecoveryData);
+
+    VertexRecoveryData vertex1RecoveryData = new 
VertexRecoveryData(v1InitedEvent,
+        null, null, null, null, false);
+    VertexRecoveryData vertex2RecoveryData = new 
VertexRecoveryData(v2InitedEvent,
+        v2ReconfigureDoneEvent, v2StartedEvent, null, v2taskRecoveryDataMap, 
false);
+    VertexRecoveryData vertex3RecoveryData = new 
VertexRecoveryData(v3InitedEvent,
+        v3ReconfigureDoneEvent, v3StartedEvent, null, v3taskRecoveryDataMap, 
false);
+
+    
doReturn(vertex1RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+    
doReturn(vertex2RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v2Id);
+    
doReturn(vertex3RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v3Id);
+
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, 
dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
+
+    VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+    assertEquals(DAGState.RUNNING, dag.getState());
+
+    // reinitialize v1
+    assertEquals(VertexState.INITIALIZING, v1.getState());
+
+    // v2 skip initialization
+    assertEquals(VertexState.RUNNING, v2.getState());
+    assertEquals(v2InitedTime, v2.initedTime);
+    assertEquals(v2StartedTime, v2.startedTime);
+    assertEquals(v2NumTask, v2.getTotalTasks());
+
+    // reinitialize v3
+    assertEquals(VertexState.INITED, v3.getState());
+  }
   
   /////////////////////////////// Task 
////////////////////////////////////////////////////////////
   
@@ -808,8 +1032,13 @@ public class TestDAGRecovery {
     VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new 
VertexConfigurationDoneEvent(v1Id, 
         0L, v1NumTask, null, null, rootInputSpecs, true);
     VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, 
v1StartedTime);
+    TaskStartedEvent v1taskStartedEvent = new TaskStartedEvent(t1v1Id, 
"vertex1", 0L, 0L);
+    TaskRecoveryData v1taskRecoveryData = new 
TaskRecoveryData(v1taskStartedEvent, null, null);
+    Map<TezTaskID, TaskRecoveryData> v1taskRecoveryDataMap = new HashMap<>();
+    // put dummy tasks
+    v1taskRecoveryDataMap.put(t1v1Id, v1taskRecoveryData);
     VertexRecoveryData v1RecoveryData = new VertexRecoveryData(v1InitedEvent,
-        v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap<TezTaskID, 
TaskRecoveryData>(), false);
+        v1ReconfigureDoneEvent, v1StartedEvent, null, v1taskRecoveryDataMap, 
false);
     
     DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagId, 
dagInitedTime, 
         "user", "dagName", null);

Reply via email to