Repository: tez
Updated Branches:
  refs/heads/branch-0.5 e3b5cd54d -> dbf36fb77


TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in 
recovery. (Jeff Zhang via hitesh)

(cherry picked from commit 6b6834e823e1649dc5539adbe3a40b87adfc4648)

Conflicts:
        CHANGES.txt
        
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dbf36fb7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dbf36fb7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dbf36fb7

Branch: refs/heads/branch-0.5
Commit: dbf36fb777054fdbff290356ec5488694d77d774
Parents: e3b5cd5
Author: Hitesh Shah <[email protected]>
Authored: Wed Apr 29 14:47:48 2015 -0700
Committer: Jeff Zhang <[email protected]>
Committed: Thu Apr 30 09:27:57 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   2 +-
 .../dag/app/dag/impl/TestVertexRecovery.java    | 125 ++++++++++++++++++-
 3 files changed, 124 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/dbf36fb7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 77312dd..258da34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in 
recovery.
   TEZ-2348. EOF exception during UnorderedKVReader.next().
   TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job 
output-path
   TEZ-2303. ConcurrentModificationException while processing recovery.

http://git-wip-us.apache.org/repos/asf/tez/blob/dbf36fb7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a2fc897..f5b45d5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -287,7 +287,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex,
                   new StartRecoverTransition())
           .addTransition
               (VertexState.NEW,
-                  EnumSet.of(VertexState.INITED,
+                  EnumSet.of(VertexState.NEW, VertexState.INITED,
                       VertexState.INITIALIZING, VertexState.RUNNING,
                       VertexState.SUCCEEDED, VertexState.FAILED,
                       VertexState.KILLED, VertexState.ERROR,

http://git-wip-us.apache.org/repos/asf/tez/blob/dbf36fb7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 8fa574c..778f95c 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -215,6 +215,94 @@ public class TestVertexRecovery {
     return dag;
   }
 
+  /*
+   * v1
+   *  |
+   * v2
+   */
+  private DAGPlan createDAGPlanMR() {
+    DAGPlan dag =
+        DAGPlan
+            .newBuilder()
+            .setName("testverteximpl")
+            .addVertex(
+                VertexPlan
+                    .newBuilder()
+                    .setName("vertex1")
+                    .setType(PlanVertexType.NORMAL)
+                    .addTaskLocationHint(
+                        PlanTaskLocationHint.newBuilder().addHost("host1")
+                            .addRack("rack1").build())
+                    .setTaskConfig(
+                        PlanTaskConfiguration.newBuilder().setNumTasks(1)
+                            .setVirtualCores(4).setMemoryMb(1024)
+                            .setJavaOpts("").setTaskModule("x1.y1").build())
+                    .addOutEdgeId("e1")
+                    .addOutputs(
+                        DAGProtos.RootInputLeafOutputProto
+                            .newBuilder()
+                            .setIODescriptor(
+                                TezEntityDescriptorProto.newBuilder()
+                                    .setClassName("output").build())
+                            .setName("outputx")
+                            .setControllerDescriptor(
+                                TezEntityDescriptorProto
+                                    .newBuilder()
+                                    .setClassName(
+                                        
CountingOutputCommitter.class.getName())))
+                    .build())
+            .addVertex(
+                VertexPlan
+                    .newBuilder()
+                    .setName("vertex2")
+                    .setType(PlanVertexType.NORMAL)
+                    .setProcessorDescriptor(
+                        TezEntityDescriptorProto.newBuilder().setClassName(
+                            "x2.y2"))
+                    .addTaskLocationHint(
+                        PlanTaskLocationHint.newBuilder().addHost("host2")
+                            .addRack("rack2").build())
+                    .setTaskConfig(
+                        PlanTaskConfiguration.newBuilder().setNumTasks(2)
+                            .setVirtualCores(4).setMemoryMb(1024)
+                            .setJavaOpts("foo").setTaskModule("x2.y2").build())
+                    .addInEdgeId("e1")
+                    .addOutputs(
+                        DAGProtos.RootInputLeafOutputProto
+                            .newBuilder()
+                            .setIODescriptor(
+                                TezEntityDescriptorProto.newBuilder()
+                                    .setClassName("output").build())
+                            .setName("outputx")
+                            .setControllerDescriptor(
+                                TezEntityDescriptorProto
+                                    .newBuilder()
+                                    .setClassName(
+                                        
CountingOutputCommitter.class.getName())))
+                    .build()
+
+            )
+            .addEdge(
+                EdgePlan
+                    .newBuilder()
+                    .setEdgeDestination(
+                        TezEntityDescriptorProto.newBuilder().setClassName(
+                            "i2_v1"))
+                    .setInputVertexName("vertex1")
+                    .setEdgeSource(
+                        TezEntityDescriptorProto.newBuilder()
+                            .setClassName("o1"))
+                    .setOutputVertexName("vertex2")
+                    .setDataMovementType(
+                        PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1")
+                    .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                    .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                    .build())
+            .build();
+
+    return dag;
+  }
+
   class VertexEventHanlder implements EventHandler<VertexEvent> {
 
     private List<VertexEvent> events = new ArrayList<VertexEvent>();
@@ -607,7 +695,8 @@ public class TestVertexRecovery {
 
   /**
    * vertex1 (New) -> StartRecoveryTransition <br>
-   * vertex2 (New) -> StartRecoveryTransition vertex3 (New) -> 
RecoverTransition
+   * vertex2 (New) -> StartRecoveryTransition <br>
+   * vertex3 (New) -> RecoverTransition
    */
   @Test
   public void testRecovery_RecoveringFromNew() {
@@ -655,9 +744,39 @@ public class TestVertexRecovery {
     assertOutputCommitters(vertex3);
 
   }
-  
-  
+ 
+  /**
+   * vertex1 (New) -> StartRecoveryTransition <br>
+   * vertex2 (New) -> RecoveryTransition <br>
+   */
   @Test
+  public void testMRDAG() {
+    DAGPlan dagPlan = createDAGPlanMR();
+    dag =
+        new DAGImpl(dagId, new Configuration(), dagPlan,
+            dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+            new Credentials(), new SystemClock(), user,
+            mock(TaskHeartbeatHandler.class), mockAppContext);
+    when(mockAppContext.getCurrentDAG()).thenReturn(dag);
+    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
+
+    VertexImpl vertex1 = (VertexImpl)dag.getVertex("vertex1");
+    VertexImpl vertex2 = (VertexImpl)dag.getVertex("vertex2");
+    assertEquals(VertexState.NEW, vertex1.getState());
+    assertEquals(VertexState.NEW, vertex1.getState());
+
+    // vertex1 handle RecoveryEvent at the state of NEW
+    // vertex 2 handle SourceVertexRecoveryEvent at the state of NEW
+    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
+        VertexState.RUNNING));
+    dispatcher.await();
+    assertEquals(VertexState.RUNNING, vertex1.getState());
+    assertEquals(1, vertex1.getTasks().size());
+    // verify OutputCommitter is initialized
+    assertOutputCommitters(vertex1);
+    assertEquals(VertexState.RUNNING, vertex2.getState());
+  }
+
   public void testRecovery_VertexManagerErrorOnRecovery() {
     VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
     restoreFromInitializedEvent(vertex1);

Reply via email to