Repository: tez
Updated Branches:
refs/heads/branch-0.5 e3b5cd54d -> dbf36fb77
TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in
recovery. (Jeff Zhang via hitesh)
(cherry picked from commit 6b6834e823e1649dc5539adbe3a40b87adfc4648)
Conflicts:
CHANGES.txt
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dbf36fb7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dbf36fb7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dbf36fb7
Branch: refs/heads/branch-0.5
Commit: dbf36fb777054fdbff290356ec5488694d77d774
Parents: e3b5cd5
Author: Hitesh Shah <[email protected]>
Authored: Wed Apr 29 14:47:48 2015 -0700
Committer: Jeff Zhang <[email protected]>
Committed: Thu Apr 30 09:27:57 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 +-
.../dag/app/dag/impl/TestVertexRecovery.java | 125 ++++++++++++++++++-
3 files changed, 124 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/dbf36fb7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 77312dd..258da34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in
recovery.
TEZ-2348. EOF exception during UnorderedKVReader.next().
TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job
output-path
TEZ-2303. ConcurrentModificationException while processing recovery.
http://git-wip-us.apache.org/repos/asf/tez/blob/dbf36fb7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a2fc897..f5b45d5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -287,7 +287,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex,
new StartRecoverTransition())
.addTransition
(VertexState.NEW,
- EnumSet.of(VertexState.INITED,
+ EnumSet.of(VertexState.NEW, VertexState.INITED,
VertexState.INITIALIZING, VertexState.RUNNING,
VertexState.SUCCEEDED, VertexState.FAILED,
VertexState.KILLED, VertexState.ERROR,
http://git-wip-us.apache.org/repos/asf/tez/blob/dbf36fb7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 8fa574c..778f95c 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -215,6 +215,94 @@ public class TestVertexRecovery {
return dag;
}
+ /*
+ * v1
+ * |
+ * v2
+ */
+ private DAGPlan createDAGPlanMR() {
+ DAGPlan dag =
+ DAGPlan
+ .newBuilder()
+ .setName("testverteximpl")
+ .addVertex(
+ VertexPlan
+ .newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder().addHost("host1")
+ .addRack("rack1").build())
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder().setNumTasks(1)
+ .setVirtualCores(4).setMemoryMb(1024)
+ .setJavaOpts("").setTaskModule("x1.y1").build())
+ .addOutEdgeId("e1")
+ .addOutputs(
+ DAGProtos.RootInputLeafOutputProto
+ .newBuilder()
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("output").build())
+ .setName("outputx")
+ .setControllerDescriptor(
+ TezEntityDescriptorProto
+ .newBuilder()
+ .setClassName(
+
CountingOutputCommitter.class.getName())))
+ .build())
+ .addVertex(
+ VertexPlan
+ .newBuilder()
+ .setName("vertex2")
+ .setType(PlanVertexType.NORMAL)
+ .setProcessorDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ "x2.y2"))
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder().addHost("host2")
+ .addRack("rack2").build())
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder().setNumTasks(2)
+ .setVirtualCores(4).setMemoryMb(1024)
+ .setJavaOpts("foo").setTaskModule("x2.y2").build())
+ .addInEdgeId("e1")
+ .addOutputs(
+ DAGProtos.RootInputLeafOutputProto
+ .newBuilder()
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("output").build())
+ .setName("outputx")
+ .setControllerDescriptor(
+ TezEntityDescriptorProto
+ .newBuilder()
+ .setClassName(
+
CountingOutputCommitter.class.getName())))
+ .build()
+
+ )
+ .addEdge(
+ EdgePlan
+ .newBuilder()
+ .setEdgeDestination(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ "i2_v1"))
+ .setInputVertexName("vertex1")
+ .setEdgeSource(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("o1"))
+ .setOutputVertexName("vertex2")
+ .setDataMovementType(
+ PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build())
+ .build();
+
+ return dag;
+ }
+
class VertexEventHanlder implements EventHandler<VertexEvent> {
private List<VertexEvent> events = new ArrayList<VertexEvent>();
@@ -607,7 +695,8 @@ public class TestVertexRecovery {
/**
* vertex1 (New) -> StartRecoveryTransition <br>
- * vertex2 (New) -> StartRecoveryTransition vertex3 (New) ->
RecoverTransition
+ * vertex2 (New) -> StartRecoveryTransition <br>
+ * vertex3 (New) -> RecoverTransition
*/
@Test
public void testRecovery_RecoveringFromNew() {
@@ -655,9 +744,39 @@ public class TestVertexRecovery {
assertOutputCommitters(vertex3);
}
-
-
+
+ /**
+ * vertex1 (New) -> StartRecoveryTransition <br>
+ * vertex2 (New) -> RecoveryTransition <br>
+ */
@Test
+ public void testMRDAG() {
+ DAGPlan dagPlan = createDAGPlanMR();
+ dag =
+ new DAGImpl(dagId, new Configuration(), dagPlan,
+ dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+ new Credentials(), new SystemClock(), user,
+ mock(TaskHeartbeatHandler.class), mockAppContext);
+ when(mockAppContext.getCurrentDAG()).thenReturn(dag);
+ dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
+
+ VertexImpl vertex1 = (VertexImpl)dag.getVertex("vertex1");
+ VertexImpl vertex2 = (VertexImpl)dag.getVertex("vertex2");
+ assertEquals(VertexState.NEW, vertex1.getState());
+ assertEquals(VertexState.NEW, vertex1.getState());
+
+ // vertex1 handle RecoveryEvent at the state of NEW
+ // vertex 2 handle SourceVertexRecoveryEvent at the state of NEW
+ vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
+ VertexState.RUNNING));
+ dispatcher.await();
+ assertEquals(VertexState.RUNNING, vertex1.getState());
+ assertEquals(1, vertex1.getTasks().size());
+ // verify OutputCommitter is initialized
+ assertOutputCommitters(vertex1);
+ assertEquals(VertexState.RUNNING, vertex2.getState());
+ }
+
public void testRecovery_VertexManagerErrorOnRecovery() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
restoreFromInitializedEvent(vertex1);