TEZ-172. TestVertexImpl hangs (intermittent failure).
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d94d37af Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d94d37af Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d94d37af Branch: refs/heads/master Commit: d94d37afbe9a0cc045e2c8717c93fa5446b589b9 Parents: d3b1921 Author: Hitesh Shah <[email protected]> Authored: Fri May 31 17:10:14 2013 -0700 Committer: Mike Liddell <[email protected]> Committed: Fri May 31 18:09:37 2013 -0700 ---------------------------------------------------------------------- pom.xml | 3 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 6 +- .../apache/tez/dag/app/dag/impl/TestDAGImpl.java | 68 +++-- .../tez/dag/app/dag/impl/TestVertexImpl.java | 262 ++++++++++----- 4 files changed, 218 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d94d37af/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 721df2f..4c5d0c5 100644 --- a/pom.xml +++ b/pom.xml @@ -325,7 +325,8 @@ <artifactId>maven-surefire-plugin</artifactId> <version>2.14.1</version> <configuration> - <forkMode>always</forkMode> + <forkCount>1</forkCount> + <reuseForks>false</reuseForks> <forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds> <argLine>-Xmx1024m -XX:+HeapDumpOnOutOfMemoryError</argLine> <redirectTestOutputToFile>true</redirectTestOutputToFile> http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d94d37af/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index c7bceb6..b06e264 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -613,7 +613,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, getStateMachine().doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { String message = "Invalid event " + event.getType() + - " on vertex " + this.vertexId + + " on vertex " + this.vertexName + + " with vertexId " + this.vertexId + " at current state " + oldState; LOG.error("Can't handle " + message, e); addDiagnostic(message); @@ -630,6 +631,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, finally { writeLock.unlock(); } + LOG.info("DEBUG: Finished processing VertexEvent " + event.getVertexId() + + " of type " + event.getType() + " while in state " + + getInternalState() + ". Event: " + event); } private VertexState getInternalState() { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d94d37af/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 3adcd8b..9e94cbd 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -373,19 +373,23 @@ public class TestDAGImpl { @After public void teardown() { - dagPlan = null; - dag = null; dispatcher.await(); dispatcher.stop(); + dagPlan = null; + dag = null; } private void initDAG(DAGImpl dag) { - dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT)); + dag.handle( + new DAGEvent(dagId, DAGEventType.DAG_INIT)); Assert.assertEquals(DAGState.INITED, dag.getState()); } + @SuppressWarnings("unchecked") private void startDAG(DAGImpl dag) { - dag.handle(new DAGEvent(dagId, DAGEventType.DAG_START)); + dispatcher.getEventHandler().handle( + new DAGEvent(dagId, DAGEventType.DAG_START)); + dispatcher.await(); Assert.assertEquals(DAGState.RUNNING, dag.getState()); } @@ -432,9 +436,9 @@ public class TestDAGImpl { TezVertexID vId = new TezVertexID(dagId, 1); Vertex v = dag.getVertex(vId); - ((EventHandler<VertexEvent>) v).handle(new VertexEventTaskCompleted( + dispatcher.getEventHandler().handle(new VertexEventTaskCompleted( new TezTaskID(vId, 0), TaskState.SUCCEEDED)); - ((EventHandler<VertexEvent>) v).handle(new VertexEventTaskCompleted( + dispatcher.getEventHandler().handle(new VertexEventTaskCompleted( new TezTaskID(vId, 1), TaskState.SUCCEEDED)); dispatcher.await(); @@ -442,12 +446,14 @@ public class TestDAGImpl { Assert.assertEquals(1, dag.getSuccessfulVertices()); } + @SuppressWarnings("unchecked") public void testKillStartedDAG() { initDAG(dag); startDAG(dag); dispatcher.await(); - dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.getEventHandler().handle( + new DAGEvent(dagId, DAGEventType.DAG_KILL)); dispatcher.await(); Assert.assertEquals(DAGState.KILLED, dag.getState()); @@ -479,7 +485,7 @@ public class TestDAGImpl { Assert.assertEquals(VertexState.SUCCEEDED, v0.getState()); Assert.assertEquals(VertexState.RUNNING, v1.getState()); - dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); dispatcher.await(); Assert.assertEquals(DAGState.KILL_WAIT, dag.getState()); @@ -493,13 +499,16 @@ public class TestDAGImpl { Assert.assertEquals(1, dag.getSuccessfulVertices()); } + @SuppressWarnings("unchecked") @Test public void testInvalidEvent() { - dag.handle(new DAGEvent(dagId, DAGEventType.DAG_START)); + dispatcher.getEventHandler().handle( + new DAGEvent(dagId, DAGEventType.DAG_START)); dispatcher.await(); Assert.assertEquals(DAGState.ERROR, dag.getState()); } + @SuppressWarnings("unchecked") @Test @Ignore public void testVertexSuccessfulCompletionUpdates() { @@ -508,28 +517,29 @@ public class TestDAGImpl { dispatcher.await(); for (int i = 0; i < 6; ++i) { - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 0), VertexState.SUCCEEDED)); } dispatcher.await(); Assert.assertEquals(DAGState.RUNNING, dag.getState()); Assert.assertEquals(1, dag.getSuccessfulVertices()); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 1), VertexState.SUCCEEDED)); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 2), VertexState.SUCCEEDED)); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 3), VertexState.SUCCEEDED)); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 4), VertexState.SUCCEEDED)); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 5), VertexState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(DAGState.SUCCEEDED, dag.getState()); Assert.assertEquals(6, dag.getSuccessfulVertices()); } + @SuppressWarnings("unchecked") @Test @Ignore public void testVertexFailureHandling() { @@ -537,14 +547,14 @@ public class TestDAGImpl { startDAG(dag); dispatcher.await(); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 0), VertexState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(DAGState.RUNNING, dag.getState()); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 1), VertexState.SUCCEEDED)); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 2), VertexState.FAILED)); dispatcher.await(); Assert.assertEquals(DAGState.FAILED, dag.getState()); @@ -558,6 +568,7 @@ public class TestDAGImpl { } } + @SuppressWarnings("unchecked") @Test @Ignore public void testDAGKill() { @@ -565,17 +576,18 @@ public class TestDAGImpl { startDAG(dag); dispatcher.await(); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 0), VertexState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(DAGState.RUNNING, dag.getState()); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 1), VertexState.SUCCEEDED)); - dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.getEventHandler().handle( + new DAGEvent(dagId, DAGEventType.DAG_KILL)); for (int i = 2; i < 6; ++i) { - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, i), VertexState.SUCCEEDED)); } dispatcher.await(); @@ -584,29 +596,31 @@ public class TestDAGImpl { Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents); } + @SuppressWarnings("unchecked") @Test public void testDAGKillPending() { initDAG(dag); startDAG(dag); dispatcher.await(); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 0), VertexState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(DAGState.RUNNING, dag.getState()); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 1), VertexState.SUCCEEDED)); - dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.getEventHandler().handle( + new DAGEvent(dagId, DAGEventType.DAG_KILL)); for (int i = 2; i < 5; ++i) { - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, i), VertexState.SUCCEEDED)); } dispatcher.await(); Assert.assertEquals(DAGState.KILL_WAIT, dag.getState()); - dag.handle(new DAGEventVertexCompleted( + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( new TezVertexID(dagId, 5), VertexState.KILLED)); dispatcher.await(); Assert.assertEquals(DAGState.KILLED, dag.getState()); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d94d37af/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index e4a1040..41b500f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -413,11 +413,12 @@ public class TestVertexImpl { vertexIdMap = new HashMap<TezVertexID, VertexImpl>(); for (int i = 0; i < vCnt; ++i) { VertexPlan vPlan = dagPlan.getVertex(i); + String vName = vPlan.getName(); TezVertexID vertexId = new TezVertexID(dagId, i+1); VertexImpl v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf, dispatcher.getEventHandler(), taskAttemptListener, jobToken, fsTokens, clock, thh, appContext, vertexLocationHint); - vertices.put(vPlan.getName(), v); + vertices.put(vName, v); vertexIdMap.put(vertexId, v); } } @@ -492,16 +493,29 @@ public class TestVertexImpl { @After public void teardown() { + dispatcher.await(); + dispatcher.stop(); + dispatcher = null; + vertexEventDispatcher = null; + dagEventDispatcher = null; dagPlan = null; this.vertices = null; this.edges = null; - dispatcher.await(); - dispatcher.stop(); + this.vertexIdMap = null; } + private void initAllVertices() { + for (int i = 1; i <= 6; ++i) { + VertexImpl v = vertices.get("vertex" + i); + initVertex(v); + } + } + + + @SuppressWarnings("unchecked") private void initVertex(VertexImpl v) { Assert.assertEquals(VertexState.NEW, v.getState()); - v.handle(new VertexEvent(v.getVertexId(), + dispatcher.getEventHandler().handle(new VertexEvent(v.getVertexId(), VertexEventType.V_INIT)); dispatcher.await(); Assert.assertEquals(VertexState.INITED, v.getState()); @@ -511,8 +525,10 @@ public class TestVertexImpl { startVertex(v, true); } + @SuppressWarnings("unchecked") private void killVertex(VertexImpl v, boolean checkKillWait) { - v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL)); + dispatcher.getEventHandler().handle( + new VertexEvent(v.getVertexId(), VertexEventType.V_KILL)); dispatcher.await(); if (checkKillWait) { Assert.assertEquals(VertexState.KILL_WAIT, v.getState()); @@ -521,10 +537,11 @@ public class TestVertexImpl { } } + @SuppressWarnings("unchecked") private void startVertex(VertexImpl v, boolean checkRunningState) { Assert.assertEquals(VertexState.INITED, v.getState()); - v.handle(new VertexEvent(v.getVertexId(), + dispatcher.getEventHandler().handle(new VertexEvent(v.getVertexId(), VertexEventType.V_START)); dispatcher.await(); if (checkRunningState) { @@ -532,7 +549,7 @@ public class TestVertexImpl { } } - @Test + @Test(timeout = 5000) public void testVertexInit() { VertexImpl v = vertices.get("vertex2"); initVertex(v); @@ -583,66 +600,79 @@ public class TestVertexImpl { .getOutputClassName())); } - @Test + @Test(timeout = 5000) public void testVertexStart() { + initAllVertices(); + VertexImpl v = vertices.get("vertex2"); - initVertex(v); startVertex(v); } - @Test + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testBasicVertexCompletion() { + initAllVertices(); + VertexImpl v = vertices.get("vertex2"); - initVertex(v); startVertex(v); TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); TezTaskID t2 = new TezTaskID(v.getVertexId(), 1); - v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); Assert.assertEquals(1, v.getCompletedTasks()); - v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(2, v.getCompletedTasks()); } - @Test + @SuppressWarnings("unchecked") + @Test(timeout = 5000) @Ignore // FIXME fix verteximpl for this test to work public void testDuplicateTaskCompletion() { + initAllVertices(); + VertexImpl v = vertices.get("vertex2"); - initVertex(v); startVertex(v); TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); TezTaskID t2 = new TezTaskID(v.getVertexId(), 1); - v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); - v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); - v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); } - @Test + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testVertexFailure() { + initAllVertices(); + VertexImpl v = vertices.get("vertex2"); - initVertex(v); startVertex(v); TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); - v.handle(new VertexEventTaskCompleted(t1, TaskState.FAILED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.FAILED)); dispatcher.await(); Assert.assertEquals(VertexState.FAILED, v.getState()); String diagnostics = @@ -650,17 +680,18 @@ public class TestVertexImpl { Assert.assertTrue(diagnostics.contains("task failed " + t1.toString())); } - @Test + @Test(timeout = 5000) public void testVertexWithNoTasks() { // FIXME a vertex with no tasks should not be allowed + initAllVertices(); + VertexImpl v = vertices.get("vertex1"); - initVertex(v); startVertex(v, false); dispatcher.await(); Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); } - @Test + @Test(timeout = 5000) public void testVertexKillDiagnostics() { VertexImpl v1 = vertices.get("vertex1"); killVertex(v1, false); @@ -679,7 +710,14 @@ public class TestVertexImpl { "vertex received kill in inited state")); VertexImpl v3 = vertices.get("vertex3"); + VertexImpl v4 = vertices.get("vertex4"); + VertexImpl v5 = vertices.get("vertex5"); + VertexImpl v6 = vertices.get("vertex6"); initVertex(v3); + initVertex(v4); + initVertex(v5); + initVertex(v6); + startVertex(v3); killVertex(v3, true); diagnostics = @@ -688,68 +726,77 @@ public class TestVertexImpl { "vertex received kill while in running state")); } - @Test + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testVertexKillPending() { - VertexImpl v = vertices.get("vertex2"); - initVertex(v); - VertexImpl v3 = vertices.get("vertex3"); - initVertex(v3); + initAllVertices(); + VertexImpl v = vertices.get("vertex2"); startVertex(v); - v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL)); + dispatcher.getEventHandler().handle( + new VertexEvent(v.getVertexId(), VertexEventType.V_KILL)); + dispatcher.await(); Assert.assertEquals(VertexState.KILL_WAIT, v.getState()); - v.handle(new VertexEventTaskCompleted( - new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted( + new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED)); + dispatcher.await(); Assert.assertEquals(VertexState.KILL_WAIT, v.getState()); - v.handle(new VertexEventTaskCompleted( - new TezTaskID(v.getVertexId(), 1), TaskState.KILLED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted( + new TezTaskID(v.getVertexId(), 1), TaskState.KILLED)); dispatcher.await(); Assert.assertEquals(VertexState.KILLED, v.getState()); } - @Test + @SuppressWarnings("unchecked") + @Test(timeout = 5000) @Ignore public void testVertexKill() { - VertexImpl v = vertices.get("vertex2"); - initVertex(v); - VertexImpl v3 = vertices.get("vertex3"); - initVertex(v3); + initAllVertices(); + VertexImpl v = vertices.get("vertex2"); startVertex(v); - v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL)); + dispatcher.getEventHandler().handle( + new VertexEvent(v.getVertexId(), VertexEventType.V_KILL)); Assert.assertEquals(VertexState.KILL_WAIT, v.getState()); - v.handle(new VertexEventTaskCompleted( - new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted( + new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED)); Assert.assertEquals(VertexState.KILL_WAIT, v.getState()); - v.handle(new VertexEventTaskCompleted( - new TezTaskID(v.getVertexId(), 1), TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted( + new TezTaskID(v.getVertexId(), 1), TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.KILLED, v.getState()); } - @Test + @SuppressWarnings("unchecked") + @Test(timeout = 5000) @Ignore public void testKilledTasksHandling() { + initAllVertices(); + VertexImpl v = vertices.get("vertex2"); - initVertex(v); startVertex(v); TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); TezTaskID t2 = new TezTaskID(v.getVertexId(), 1); - v.handle(new VertexEventTaskCompleted(t1, TaskState.FAILED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.FAILED)); dispatcher.await(); Assert.assertEquals(VertexState.FAILED, v.getState()); Assert.assertEquals(TaskState.KILLED, v.getTask(t2).getState()); } - @Test + @Test(timeout = 5000) public void testVertexCommitterInit() { VertexImpl v2 = vertices.get("vertex2"); initVertex(v2); @@ -762,7 +809,7 @@ public class TestVertexImpl { instanceof MRVertexOutputCommitter); } - @Test + @Test(timeout = 5000) public void testVertexSchedulerInit() { VertexImpl v2 = vertices.get("vertex2"); initVertex(v2); @@ -775,10 +822,13 @@ public class TestVertexImpl { instanceof BipartiteSlowStartVertexScheduler); } - @Test + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testVertexTaskFailure() { + initAllVertices(); + VertexImpl v = vertices.get("vertex2"); - initVertex(v); + CountingVertexOutputCommitter committer = new CountingVertexOutputCommitter(); v.setVertexOutputCommitter(committer); @@ -787,62 +837,66 @@ public class TestVertexImpl { TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); TezTaskID t2 = new TezTaskID(v.getVertexId(), 1); - v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); - v.handle(new VertexEventTaskCompleted(t2, TaskState.FAILED)); - v.handle(new VertexEventTaskCompleted(t2, TaskState.FAILED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t2, TaskState.FAILED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t2, TaskState.FAILED)); dispatcher.await(); Assert.assertEquals(VertexState.FAILED, v.getState()); Assert.assertEquals(0, committer.commitCounter); Assert.assertEquals(1, committer.abortCounter); } - @Test + @Test(timeout = 5000) public void testSourceVertexStartHandling() { + LOG.info("Testing testSourceVertexStartHandling"); + initAllVertices(); + VertexImpl v4 = vertices.get("vertex4"); - initVertex(v4); VertexImpl v5 = vertices.get("vertex5"); - initVertex(v5); VertexImpl v6 = vertices.get("vertex6"); - initVertex(v6); - Assert.assertEquals(VertexState.INITED, v6.getState()); startVertex(v4); startVertex(v5); dispatcher.await(); + LOG.info("Verifying v6 state " + v6.getState()); Assert.assertEquals(VertexState.RUNNING, v6.getState()); Assert.assertEquals(1, v6.getDistanceFromRoot()); } - @Test + @Test(timeout = 5000) public void testCounters() { // FIXME need to test counters at vertex level } - @Test + @Test(timeout = 5000) public void testDiagnostics() { // FIXME need to test diagnostics in various cases } - @Test + @Test(timeout = 5000) public void testTaskAttemptCompletionEvents() { // FIXME need to test handling of task attempt events } - @Test + @Test(timeout = 5000) public void testSourceTaskAttemptCompletionEvents() { + LOG.info("Testing testSourceTaskAttemptCompletionEvents"); + initAllVertices(); + VertexImpl v4 = vertices.get("vertex4"); - initVertex(v4); VertexImpl v5 = vertices.get("vertex5"); - initVertex(v5); VertexImpl v6 = vertices.get("vertex6"); - initVertex(v6); startVertex(v4); startVertex(v5); dispatcher.await(); + LOG.info("Verifying v6 state " + v6.getState()); Assert.assertEquals(VertexState.RUNNING, v6.getState()); TezTaskID t1_v4 = new TezTaskID(v4.getVertexId(), 0); @@ -897,17 +951,21 @@ public class TestVertexImpl { Assert.assertEquals(6, v6.getTaskAttemptCompletionEvents(0, 100).length); } - @Test + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testDAGEventGeneration() { + initAllVertices(); + VertexImpl v = vertices.get("vertex2"); - initVertex(v); startVertex(v); TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); TezTaskID t2 = new TezTaskID(v.getVertexId(), 1); - v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); - v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(1, @@ -915,11 +973,14 @@ public class TestVertexImpl { DAGEventType.DAG_VERTEX_COMPLETED).intValue()); } - @Test + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testTaskReschedule() { // For downstream failures + initAllVertices(); + VertexImpl v = vertices.get("vertex2"); - initVertex(v); + CountingVertexOutputCommitter committer = new CountingVertexOutputCommitter(); v.setVertexOutputCommitter(committer); @@ -929,26 +990,33 @@ public class TestVertexImpl { TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); TezTaskID t2 = new TezTaskID(v.getVertexId(), 1); - v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); - v.handle(new VertexEventTaskReschedule(t1)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskReschedule(t1)); // FIXME need to handle dups - // v.handle(new VertexEventTaskReschedule(t1)); - v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); + // dispatcher.getEventHandler().handle(new VertexEventTaskReschedule(t1)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); Assert.assertEquals(0, committer.commitCounter); - v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(1, committer.commitCounter); } - @Test + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testVertexCommit() { + initAllVertices(); + VertexImpl v = vertices.get("vertex2"); - initVertex(v); + CountingVertexOutputCommitter committer = new CountingVertexOutputCommitter(); v.setVertexOutputCommitter(committer); @@ -958,14 +1026,18 @@ public class TestVertexImpl { TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); TezTaskID t2 = new TezTaskID(v.getVertexId(), 1); - v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); - v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); - v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(1, committer.commitCounter); - v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(1, committer.commitCounter); @@ -974,20 +1046,23 @@ public class TestVertexImpl { Assert.assertEquals(0, committer.setupCounter); // already done in init } - @Test + @Test(timeout = 5000) public void testCommitterInitAndSetup() { // FIXME need to add a test for this } - @Test + @Test(timeout = 5000) public void testTaskAttemptFetchFailureHandling() { // FIXME needs testing } - @Test + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testBadCommitter() { + initAllVertices(); + VertexImpl v = vertices.get("vertex2"); - initVertex(v); + CountingVertexOutputCommitter committer = new CountingVertexOutputCommitter(true, true); v.setVertexOutputCommitter(committer); @@ -997,8 +1072,10 @@ public class TestVertexImpl { TezTaskID t1 = new TezTaskID(v.getVertexId(), 0); TezTaskID t2 = new TezTaskID(v.getVertexId(), 1); - v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); - v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.FAILED, v.getState()); Assert.assertEquals(1, committer.commitCounter); @@ -1009,14 +1086,15 @@ public class TestVertexImpl { Assert.assertEquals(0, committer.setupCounter); // already done in init } - @Test + @Test(timeout = 5000) public void testHistoryEventGeneration() { } - @Test + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testInvalidEvent() { VertexImpl v = vertices.get("vertex2"); - v.handle(new VertexEvent(v.getVertexId(), + dispatcher.getEventHandler().handle(new VertexEvent(v.getVertexId(), VertexEventType.V_START)); dispatcher.await(); Assert.assertEquals(VertexState.ERROR, v.getState());
