This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 26b4fcc TEZ-4140: Tez DAG Recovery: Discrepancy In Scheduling
Vertices During Vertex Recovery (Syed Shameerur Rahman via László Bodor)
26b4fcc is described below
commit 26b4fccaba1b79903ce79087fdb8ca9d092cf6c6
Author: Syed Shameerur Rahman <[email protected]>
AuthorDate: Sun Apr 19 10:12:09 2020 +0200
TEZ-4140: Tez DAG Recovery: Discrepancy In Scheduling Vertices During
Vertex Recovery (Syed Shameerur Rahman via László Bodor)
Signed-off-by: Laszlo Bodor <[email protected]>
(cherry picked from commit e7c24f06e220cb707f114b4f5cc7210d27cce72d)
---
.../org/apache/tez/dag/app/RecoveryParser.java | 5 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 22 +-
.../tez/dag/app/dag/impl/TestDAGRecovery.java | 259 +++++++++++++++++++--
3 files changed, 268 insertions(+), 18 deletions(-)
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index bab6142..dfb7f61 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -74,7 +74,6 @@ import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
-import org.apache.tez.runtime.api.impl.TezEvent;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.common.Preconditions;
@@ -971,6 +970,10 @@ public class RecoveryParser {
return vertexInitedEvent != null && vertexConfigurationDoneEvent != null;
}
+ public boolean isVertexTasksStarted() {
+ return taskRecoveryDataMap != null && !taskRecoveryDataMap.isEmpty();
+ }
+
public boolean isVertexStarted() {
return vertexStartedEvent != null;
}
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1504c98..26ec7e9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -230,6 +230,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
private final AppContext appContext;
private final DAG dag;
private final VertexRecoveryData recoveryData;
+ private boolean isVertexInitSkipped = false;
private List<TezEvent> initGeneratedEvents = new ArrayList<TezEvent>();
// set it to be true when setParallelism is called(used for recovery)
private boolean setParallelismCalledFlag = false;
@@ -2803,6 +2804,15 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
return VertexState.INITED;
}
+ private boolean isVertexInitSkippedInParentVertices() {
+ for (Map.Entry<Vertex, Edge> entry : sourceVertices.entrySet()) {
+ if(!(((VertexImpl) entry.getKey()).isVertexInitSkipped())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private void assignVertexManager() throws TezException {
// condition for skip initializing stage
// - VertexInputInitializerEvent is seen
@@ -2815,8 +2825,10 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
// - Why using VertexReconfigureDoneEvent
// - VertexReconfigureDoneEvent represent the case that user
use API reconfigureVertex
// VertexReconfigureDoneEvent will be logged
- if (recoveryData != null
- && recoveryData.shouldSkipInit()) {
+ // - TaskStartEvent is seen in that vertex
+ // - All the parent vertices have skipped initializing stage while
recovering
+ if (recoveryData != null && recoveryData.shouldSkipInit()
+ && recoveryData.isVertexTasksStarted() &&
isVertexInitSkippedInParentVertices()) {
// Replace the original VertexManager with NoOpVertexManager if the
reconfiguration is done in the last AM attempt
VertexConfigurationDoneEvent reconfigureDoneEvent =
recoveryData.getVertexConfigurationDoneEvent();
if (LOG.isInfoEnabled()) {
@@ -2836,6 +2848,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
VertexManagerPluginDescriptor.create(NoOpVertexManager.class.getName())
.setUserPayload(UserPayload.create(ByteBuffer.wrap(out.toByteArray()))),
dagUgi, this, appContext, stateChangeNotifier);
+ isVertexInitSkipped = true;
return;
}
@@ -4664,6 +4677,11 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
return this.vertexManager;
}
+ public boolean isVertexInitSkipped() {
+ return isVertexInitSkipped;
+ }
+
+
private static void logLocationHints(String vertexName,
VertexLocationHint locationHint) {
if (locationHint == null) {
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 260bd42..fcf6db8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -176,14 +176,23 @@ public class TestDAGRecovery {
private TezVertexID v2Id;
private TezTaskID t1v2Id;
private TezTaskAttemptID ta1t1v2Id;
+ private TezVertexID v3Id;
+ private TezTaskID t1v3Id;
+ private TezTaskAttemptID ta1t1v3Id;
////////////////////////
private Random rand = new Random();
private long dagInitedTime = System.currentTimeMillis() + rand.nextInt(100);
private long dagStartedTime = dagInitedTime + rand.nextInt(100);
private long v1InitedTime = dagStartedTime + rand.nextInt(100);
+ private long v2InitedTime = dagStartedTime + rand.nextInt(100);
+ private long v3InitedTime = Math.max(v1InitedTime, v2InitedTime) +
rand.nextInt(100);
private long v1StartedTime = v1InitedTime + rand.nextInt(100);
+ private long v2StartedTime = v2InitedTime + rand.nextInt(100);
+ private long v3StartedTime = v3InitedTime + rand.nextInt(100);
private int v1NumTask = 10;
+ private int v2NumTask = 5;
+ private int v3NumTask = 2;
private long t1StartedTime = v1StartedTime + rand.nextInt(100);
private long t1FinishedTime = t1StartedTime + rand.nextInt(100);
private long ta1LaunchTime = t1StartedTime + rand.nextInt(100);
@@ -354,6 +363,9 @@ public class TestDAGRecovery {
v2Id = TezVertexID.getInstance(dagId, 1);
t1v2Id = TezTaskID.getInstance(v2Id, 0);
ta1t1v2Id = TezTaskAttemptID.getInstance(t1v2Id, 0);
+ v3Id = TezVertexID.getInstance(dagId, 2);
+ t1v3Id = TezTaskID.getInstance(v3Id, 0);
+ ta1t1v3Id = TezTaskAttemptID.getInstance(t1v3Id, 0);
dispatcher.register(CallableEventType.class, new
CallableEventDispatcher());
taskEventDispatcher = new TaskEventDispatcher();
@@ -724,7 +736,7 @@ public class TestDAGRecovery {
* DAG: DAGInitedEvent -> DAGStartedEvent
* V1: VertexReconfigrationDoneEvent -> VertexInitializedEvent
*
- * V1 skip initialization.
+ * Reinitialize V1 again.
*/
@Test//(timeout=5000)
public void testVertexRecoverFromInitedAndReconfigureDone() {
@@ -751,35 +763,75 @@ public class TestDAGRecovery {
VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
assertEquals(DAGState.RUNNING, dag.getState());
- // v1 skip initialization
- assertEquals(VertexState.RUNNING, v1.getState());
- assertEquals(v1InitedTime, v1.initedTime);
- assertEquals(v1NumTask, v1.getTotalTasks());
+ // reinitialize v1
+ assertEquals(VertexState.INITIALIZING, v1.getState());
assertEquals(VertexState.RUNNING, v2.getState());
- assertEquals(VertexState.RUNNING, v3.getState());
+ assertEquals(VertexState.INITED, v3.getState());
}
-
+
/**
* RecoveryEvents:
* DAG: DAGInitedEvent -> DAGStartedEvent
* V1: VertexReconfigrationDoneEvent -> VertexInitializedEvent ->
VertexStartedEvent
- *
- * V1 skip initialization.
+ *
+ * Reinitialize V1 again.
*/
@Test(timeout=5000)
public void testVertexRecoverFromStart() {
- initMockDAGRecoveryDataForVertex();
+ initMockDAGRecoveryDataForVertex();
List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
- VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
- "vertex1", 0L, v1InitedTime,
+ VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+ "vertex1", 0L, v1InitedTime,
v1NumTask, "", null, inputGeneratedTezEvents, null);
- VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v1Id,
+ VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v1Id,
0L, v1NumTask, null, null, null, true);
VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L,
v1StartedTime);
VertexRecoveryData vertexRecoveryData = new
VertexRecoveryData(v1InitedEvent,
v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap<TezTaskID,
TaskRecoveryData>(), false);
doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
-
+
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId,
dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
+
+ VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+ VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+ VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+ assertEquals(DAGState.RUNNING, dag.getState());
+ // reinitialize v1
+ assertEquals(VertexState.INITIALIZING, v1.getState());
+ assertEquals(VertexState.RUNNING, v2.getState());
+ assertEquals(VertexState.INITED, v3.getState());
+ }
+
+ /**
+ * RecoveryEvents:
+ * DAG: DAGInitedEvent -> DAGStartedEvent
+ * V1: VertexReconfigrationDoneEvent -> VertexInitializedEvent ->
VertexStartedEvent -> VertexTaskStart
+ *
+ * V1 skip initialization.
+ */
+ @Test(timeout=5000)
+ public void testVertexRecoverFromVertexTaskStart() {
+ initMockDAGRecoveryDataForVertex();
+ List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+ VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+ "vertex1", 0L, v1InitedTime,
+ v1NumTask, "", null, inputGeneratedTezEvents, null);
+ VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v1Id,
+ 0L, v1NumTask, null, null, null, true);
+ VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L,
v1StartedTime);
+
+ TaskStartedEvent taskStartedEvent = new TaskStartedEvent(t1v1Id, "v1", 0L,
0L);
+ TaskRecoveryData taskRecoveryData = new TaskRecoveryData(taskStartedEvent,
null, null);
+ Map<TezTaskID, TaskRecoveryData> taskRecoveryDataMap = new HashMap<>();
+ // put dummy tasks
+ taskRecoveryDataMap.put(t1v2Id, taskRecoveryData);
+
+ VertexRecoveryData vertexRecoveryData = new
VertexRecoveryData(v1InitedEvent,
+ v1ReconfigureDoneEvent, v1StartedEvent, null, taskRecoveryDataMap,
false);
+
doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+
DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId,
dagRecoveryData);
dag.handle(recoveryEvent);
dispatcher.await();
@@ -796,6 +848,178 @@ public class TestDAGRecovery {
assertEquals(VertexState.RUNNING, v2.getState());
assertEquals(VertexState.RUNNING, v3.getState());
}
+
+ /**
+ * RecoveryEvents:
+ * DAG: DAGInitedEvent -> DAGStartedEvent
+ * V1: VertexReconfigrationDoneEvent -> VertexInitializedEvent ->
VertexStartedEvent -> VertexTaskStart
+ * V2: VertexReconfigrationDoneEvent -> VertexInitializedEvent ->
VertexStartedEvent -> VertexTaskStart
+ * V3: VertexReconfigrationDoneEvent -> VertexInitializedEvent ->
VertexStartedEvent -> VertexTaskStart
+ *
+ * V1 skip initialization.
+ * V2 skip initialization.
+ * V3 skip initialization.
+ */
+ @Test(timeout=5000)
+ public void testMultipleVertexRecoverFromVertexTaskStart() {
+ initMockDAGRecoveryDataForVertex();
+ List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+ VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+ "vertex1", 0L, v1InitedTime,
+ v1NumTask, "", null, inputGeneratedTezEvents, null);
+ VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id,
+ "vertex2", 0L, v2InitedTime,
+ v2NumTask, "", null, inputGeneratedTezEvents, null);
+ VertexInitializedEvent v3InitedEvent = new VertexInitializedEvent(v3Id,
+ "vertex3", 0L, v3InitedTime,
+ v3NumTask, "", null, inputGeneratedTezEvents, null);
+
+ VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v1Id,
+ 0L, v1NumTask, null, null, null, true);
+ VertexConfigurationDoneEvent v2ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v2Id,
+ 0L, v2NumTask, null, null, null, true);
+ VertexConfigurationDoneEvent v3ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v3Id,
+ 0L, v3NumTask, null, null, null, true);
+
+ VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L,
v1StartedTime);
+ VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L,
v2StartedTime);
+ VertexStartedEvent v3StartedEvent = new VertexStartedEvent(v3Id, 0L,
v3StartedTime);
+
+ TaskStartedEvent v1taskStartedEvent = new TaskStartedEvent(t1v1Id,
"vertex1", 0L, 0L);
+ TaskRecoveryData v1taskRecoveryData = new
TaskRecoveryData(v1taskStartedEvent, null, null);
+ Map<TezTaskID, TaskRecoveryData> v1taskRecoveryDataMap = new HashMap<>();
+ // put dummy tasks
+ v1taskRecoveryDataMap.put(t1v1Id, v1taskRecoveryData);
+
+ TaskStartedEvent v2taskStartedEvent = new TaskStartedEvent(t1v2Id,
"vertex2", 0L, 0L);
+ TaskRecoveryData v2taskRecoveryData = new
TaskRecoveryData(v2taskStartedEvent, null, null);
+ Map<TezTaskID, TaskRecoveryData> v2taskRecoveryDataMap = new HashMap<>();
+ // put dummy tasks
+ v2taskRecoveryDataMap.put(t1v2Id, v2taskRecoveryData);
+
+ TaskStartedEvent v3taskStartedEvent = new TaskStartedEvent(t1v3Id,
"vertex3", 0L, 0L);
+ TaskRecoveryData v3taskRecoveryData = new
TaskRecoveryData(v3taskStartedEvent, null, null);
+ Map<TezTaskID, TaskRecoveryData> v3taskRecoveryDataMap = new HashMap<>();
+ // put dummy tasks
+ v3taskRecoveryDataMap.put(t1v3Id, v3taskRecoveryData);
+
+ VertexRecoveryData vertex1RecoveryData = new
VertexRecoveryData(v1InitedEvent,
+ v1ReconfigureDoneEvent, v1StartedEvent, null, v1taskRecoveryDataMap,
false);
+ VertexRecoveryData vertex2RecoveryData = new
VertexRecoveryData(v2InitedEvent,
+ v2ReconfigureDoneEvent, v2StartedEvent, null, v2taskRecoveryDataMap,
false);
+ VertexRecoveryData vertex3RecoveryData = new
VertexRecoveryData(v3InitedEvent,
+ v3ReconfigureDoneEvent, v3StartedEvent, null, v3taskRecoveryDataMap,
false);
+
+
doReturn(vertex1RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+
doReturn(vertex2RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v2Id);
+
doReturn(vertex3RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v3Id);
+
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId,
dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
+
+ VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+ VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+ VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+
+ assertEquals(DAGState.RUNNING, dag.getState());
+
+ // v1 skip initialization
+ assertEquals(VertexState.RUNNING, v1.getState());
+ assertEquals(v1InitedTime, v1.initedTime);
+ assertEquals(v1StartedTime, v1.startedTime);
+ assertEquals(v1NumTask, v1.getTotalTasks());
+
+ // v2 skip initialization
+ assertEquals(VertexState.RUNNING, v2.getState());
+ assertEquals(v2InitedTime, v2.initedTime);
+ assertEquals(v2StartedTime, v2.startedTime);
+ assertEquals(v2NumTask, v2.getTotalTasks());
+
+ // v3 skip initialization
+ assertEquals(VertexState.RUNNING, v3.getState());
+ assertEquals(v3InitedTime, v3.initedTime);
+ assertEquals(v3StartedTime, v3.startedTime);
+ assertEquals(v3NumTask, v3.getTotalTasks());
+ }
+
+ /**
+ * RecoveryEvents:
+ * DAG: DAGInitedEvent -> DAGStartedEvent
+ * V1: VertexReconfigrationDoneEvent -> VertexInitializedEvent
+ * V2: VertexReconfigrationDoneEvent -> VertexInitializedEvent ->
VertexStartedEvent -> VertexTaskStart
+ * V3: VertexReconfigrationDoneEvent -> VertexInitializedEvent ->
VertexStartedEvent -> VertexTaskStart
+ *
+ * Reinitialize V1 again.
+ * V2 skip initialization.
+ * Reinitialize V3 again. Since V3 is dependent on V1
+ */
+ @Test(timeout=5000)
+ public void testMultipleVertexRecoverFromVertex() {
+ initMockDAGRecoveryDataForVertex();
+ List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+ VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+ "vertex1", 0L, v1InitedTime,
+ v1NumTask, "", null, inputGeneratedTezEvents, null);
+ VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id,
+ "vertex2", 0L, v2InitedTime,
+ v2NumTask, "", null, inputGeneratedTezEvents, null);
+ VertexInitializedEvent v3InitedEvent = new VertexInitializedEvent(v3Id,
+ "vertex3", 0L, v3InitedTime,
+ v3NumTask, "", null, inputGeneratedTezEvents, null);
+
+ VertexConfigurationDoneEvent v2ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v2Id,
+ 0L, v2NumTask, null, null, null, true);
+ VertexConfigurationDoneEvent v3ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v3Id,
+ 0L, v3NumTask, null, null, null, true);
+
+ VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L,
v2StartedTime);
+ VertexStartedEvent v3StartedEvent = new VertexStartedEvent(v3Id, 0L,
v3StartedTime);
+
+ TaskStartedEvent v2taskStartedEvent = new TaskStartedEvent(t1v2Id,
"vertex2", 0L, 0L);
+ TaskRecoveryData v2taskRecoveryData = new
TaskRecoveryData(v2taskStartedEvent, null, null);
+ Map<TezTaskID, TaskRecoveryData> v2taskRecoveryDataMap = new HashMap<>();
+ // put dummy tasks
+ v2taskRecoveryDataMap.put(t1v2Id, v2taskRecoveryData);
+
+ TaskStartedEvent v3taskStartedEvent = new TaskStartedEvent(t1v3Id,
"vertex3", 0L, 0L);
+ TaskRecoveryData v3taskRecoveryData = new
TaskRecoveryData(v3taskStartedEvent, null, null);
+ Map<TezTaskID, TaskRecoveryData> v3taskRecoveryDataMap = new HashMap<>();
+ // put dummy tasks
+ v3taskRecoveryDataMap.put(t1v3Id, v3taskRecoveryData);
+
+ VertexRecoveryData vertex1RecoveryData = new
VertexRecoveryData(v1InitedEvent,
+ null, null, null, null, false);
+ VertexRecoveryData vertex2RecoveryData = new
VertexRecoveryData(v2InitedEvent,
+ v2ReconfigureDoneEvent, v2StartedEvent, null, v2taskRecoveryDataMap,
false);
+ VertexRecoveryData vertex3RecoveryData = new
VertexRecoveryData(v3InitedEvent,
+ v3ReconfigureDoneEvent, v3StartedEvent, null, v3taskRecoveryDataMap,
false);
+
+
doReturn(vertex1RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+
doReturn(vertex2RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v2Id);
+
doReturn(vertex3RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v3Id);
+
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId,
dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
+
+ VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+ VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+ VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+ assertEquals(DAGState.RUNNING, dag.getState());
+
+ // reinitialize v1
+ assertEquals(VertexState.INITIALIZING, v1.getState());
+
+ // v2 skip initialization
+ assertEquals(VertexState.RUNNING, v2.getState());
+ assertEquals(v2InitedTime, v2.initedTime);
+ assertEquals(v2StartedTime, v2.startedTime);
+ assertEquals(v2NumTask, v2.getTotalTasks());
+
+ // reinitialize v3
+ assertEquals(VertexState.INITED, v3.getState());
+ }
/////////////////////////////// Task
////////////////////////////////////////////////////////////
@@ -808,8 +1032,13 @@ public class TestDAGRecovery {
VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v1Id,
0L, v1NumTask, null, null, rootInputSpecs, true);
VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L,
v1StartedTime);
+ TaskStartedEvent v1taskStartedEvent = new TaskStartedEvent(t1v1Id,
"vertex1", 0L, 0L);
+ TaskRecoveryData v1taskRecoveryData = new
TaskRecoveryData(v1taskStartedEvent, null, null);
+ Map<TezTaskID, TaskRecoveryData> v1taskRecoveryDataMap = new HashMap<>();
+ // put dummy tasks
+ v1taskRecoveryDataMap.put(t1v1Id, v1taskRecoveryData);
VertexRecoveryData v1RecoveryData = new VertexRecoveryData(v1InitedEvent,
- v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap<TezTaskID,
TaskRecoveryData>(), false);
+ v1ReconfigureDoneEvent, v1StartedEvent, null, v1taskRecoveryDataMap,
false);
DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagId,
dagInitedTime,
"user", "dagName", null);