Repository: tez Updated Branches: refs/heads/master 8d1ec6f59 -> 07d9146f8
TEZ-3795. Vertex state machine can throw InvalidStateTransitonException from TERMINATING state (Kuhu Shukla via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/07d9146f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/07d9146f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/07d9146f Branch: refs/heads/master Commit: 07d9146f80513f2571632b2857e0fa05bec5b153 Parents: 8d1ec6f Author: Jonathan Eagles <[email protected]> Authored: Fri Jul 14 10:29:52 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Fri Jul 14 10:31:15 2017 -0500 ---------------------------------------------------------------------- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 5 ++- .../tez/dag/app/dag/impl/TestVertexImpl.java | 39 ++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/07d9146f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 4263094..59552f2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -509,7 +509,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl // Transitions from TERMINATING state. .addTransition (VertexState.TERMINATING, - EnumSet.of(VertexState.TERMINATING, VertexState.KILLED, VertexState.FAILED), + EnumSet.of(VertexState.TERMINATING, VertexState.KILLED, VertexState.FAILED, VertexState.ERROR), VertexEventType.V_TASK_COMPLETED, new TaskCompletedTransition()) .addTransition( @@ -532,7 +532,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl VertexEventType.V_ROUTE_EVENT, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, - VertexEventType.V_TASK_RESCHEDULED)) + VertexEventType.V_TASK_RESCHEDULED, + VertexEventType.V_COMPLETED)) // Transitions from SUCCEEDED state .addTransition( http://git-wip-us.apache.org/repos/asf/tez/blob/07d9146f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 6eca322..d382974 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.*; @@ -3973,6 +3974,44 @@ public class TestVertexImpl { } + @Test (timeout = 5000) + public void testTerminatingVertexForTaskComplete() throws Exception { + setupPreDagCreation(); + dagPlan = createSamplerDAGPlan(false); + setupPostDagCreation(); + VertexImpl vertex = spy(vertices.get("A")); + initVertex(vertex); + startVertex(vertex); + vertex.handle(new VertexEventTermination(vertex.getVertexId(), VertexTerminationCause.INTERNAL_ERROR)); + dispatcher.await(); + Assert.assertTrue(vertex.inTerminalState()); + for (String diagnostic : vertex.getDiagnostics()) { + if (diagnostic.contains("Invalid event")) { + fail("Unexpected Invalid event transition!"); + } + } + } + + @Test (timeout = 5000) + public void testTerminatingVertexForVComplete() throws Exception { + setupPreDagCreation(); + dagPlan = createSamplerDAGPlan(false); + setupPostDagCreation(); + VertexImpl vertex = spy(vertices.get("A")); + initVertex(vertex); + startVertex(vertex); + vertex.handle(new VertexEventTermination(vertex.getVertexId(), VertexTerminationCause.INTERNAL_ERROR)); + vertex.handle(new VertexEvent( + vertex.getVertexId(), VertexEventType.V_COMPLETED)); + dispatcher.await(); + Assert.assertTrue(vertex.inTerminalState()); + for (String diagnostic : vertex.getDiagnostics()) { + if (diagnostic.contains("Invalid event")) { + fail("Unexpected Invalid event transition!"); + } + } + } + @Test(timeout = 5000) public void testVertexSuccessToRunningAfterTaskScheduler() { // For downstream failures
