Repository: tez Updated Branches: refs/heads/master 8b412ee66 -> d63d6ee60
TEZ-2857. Fix flakey tests in TestDAGImpl. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d63d6ee6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d63d6ee6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d63d6ee6 Branch: refs/heads/master Commit: d63d6ee600464662670058485492ec56ae13cffe Parents: 8b412ee Author: Siddharth Seth <[email protected]> Authored: Mon Sep 28 17:47:50 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Mon Sep 28 17:47:50 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../tez/dag/app/dag/impl/TestDAGImpl.java | 99 +++++++++++++++----- 2 files changed, 76 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d63d6ee6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d219127..07769d8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2857. Fix flakey tests in TestDAGImpl. TEZ-2836. Avoid setting framework/system counters for tasks running in threads. TEZ-2398. Flaky test: TestFaultTolerance TEZ-2833. Dont create extra directory during ATS file download @@ -191,6 +192,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES + TEZ-2857. Fix flakey tests in TestDAGImpl. TEZ-2398. Flaky test: TestFaultTolerance TEZ-2808. Race condition between preemption and container assignment TEZ-2853. Tez UI: task attempt page is coming empty http://git-wip-us.apache.org/repos/asf/tez/blob/d63d6ee6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index dba6c01..66d1012 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -1007,7 +1007,7 @@ public class TestDAGImpl { dispatcher.await(); Assert.assertEquals(DAGState.FAILED, dag.getState()); Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause()); - Assert.assertTrue(StringUtils.join(dag.getDiagnostics(),",") + Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), ",") .contains("Vertex's TaskResource is beyond the cluster container capability")); } @@ -1093,7 +1093,7 @@ public class TestDAGImpl { Assert.assertEquals(1, dag.getSuccessfulVertices()); // 2 tasks completed, total plan has 11 vertices - Assert.assertEquals((float)2/11, + Assert.assertEquals((float) 2 / 11, dag.getCompletedTaskProgress(), 0.05); } @@ -1168,7 +1168,7 @@ public class TestDAGImpl { setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination, true); dispatcher.getEventHandler().handle( new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT)); - dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), + dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), null)); dispatcher.await(); Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState()); @@ -1183,7 +1183,8 @@ public class TestDAGImpl { DataMovementEvent daEvent = DataMovementEvent.create(ByteBuffer.wrap(new byte[0])); TezEvent tezEvent = new TezEvent(daEvent, new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getID())); - dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent))); + dispatcher.getEventHandler().handle( + new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent))); dispatcher.await(); Assert.assertEquals(VertexState.FAILED, v2.getState()); @@ -1230,7 +1231,7 @@ public class TestDAGImpl { setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationConsumerTasks); dispatcher.getEventHandler().handle( new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT)); - dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), + dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), null)); dispatcher.await(); Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState()); @@ -1245,7 +1246,8 @@ public class TestDAGImpl { InputReadErrorEvent ireEvent = InputReadErrorEvent.create("", 0, 0); TezEvent tezEvent = new TezEvent(ireEvent, new EventMetaData(EventProducerConsumerType.INPUT,"vertex2", "vertex1", ta1.getID())); - dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent))); + dispatcher.getEventHandler().handle( + new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent))); dispatcher.await(); // Assert.assertEquals(VertexState.FAILED, v2.getState()); @@ -1318,12 +1320,14 @@ public class TestDAGImpl { Vertex v2 = groupDag.getVertex("vertex2"); dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED)); dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId())); - dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED)); dispatcher.await(); // commit should not happen due to vertex-rerunning Assert.assertEquals(0, TotalCountingOutputCommitter.totalCommitCounter); - dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED)); + dispatcher.getEventHandler().handle( + new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED)); dispatcher.await(); // commit happen Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter); @@ -1417,10 +1421,10 @@ public class TestDAGImpl { .newBuilder() .setClassName(CountingOutputCommitter.class.getName()) .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder() - .setUserPayload( - ByteString - .copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig( - true, false, false).toUserPayload())).build())) + .setUserPayload( + ByteString + .copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig( + true, false, false).toUserPayload())).build())) .setName("output3") .setIODescriptor( TezEntityDescriptorProto.newBuilder().setClassName("output.class") @@ -1848,13 +1852,8 @@ public class TestDAGImpl { } } - // a dag.kill() on an active DAG races with vertices all succeeding. - // if a JOB_KILL is processed while dag is in running state, it should end in KILLED, - // regardless of whether all vertices complete - // - // Final state: - // DAG is in KILLED state, with killTrigger = USER_KILL - // Each vertex had kill triggered but raced ahead and ends in SUCCEEDED state. + // Couple of vertices succeed. DAG_KILLED processed, which causes the rest of the vertices to be + // marked as KILLED. @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testDAGKill() { @@ -1870,22 +1869,70 @@ public class TestDAGImpl { dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED)); dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.await(); + Assert.assertEquals(DAGState.KILLED, dag.getState()); + Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + Assert.assertEquals(2, dag.getSuccessfulVertices()); + + int killedCount = 0; + for (Map.Entry<TezVertexID, Vertex> vEntry : dag.getVertices().entrySet()) { + if (vEntry.getValue().getState() == VertexState.KILLED) { + killedCount++; + } + } + Assert.assertEquals(4, killedCount); + + for (Vertex v : dag.getVertices().values()) { + Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause()); + } + + Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents); + } + + // Vertices succeed after a DAG kill has been processed. Should be ignored. + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testDAGKillVertexSuccessAfterKill() { + initDAG(dag); + startDAG(dag); + dispatcher.await(); + + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( + TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED)); + dispatcher.await(); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( + TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED)); + dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.await(); + + Assert.assertEquals(DAGState.KILLED, dag.getState()); + + // Vertex SUCCESS gets processed after the DAG has reached the KILLED state. Should be ignored. for (int i = 2; i < 6; ++i) { dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED)); } dispatcher.await(); - Assert.assertEquals(DAGState.KILLED, dag.getState()); + + int killedCount = 0; + for (Map.Entry<TezVertexID, Vertex> vEntry : dag.getVertices().entrySet()) { + if (vEntry.getValue().getState() == VertexState.KILLED) { + killedCount++; + } + } + Assert.assertEquals(4, killedCount); + Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); - Assert.assertEquals(6, dag.getSuccessfulVertices()); + Assert.assertEquals(2, dag.getSuccessfulVertices()); for (Vertex v : dag.getVertices().values()) { Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause()); } Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents); } - // job kill races with most vertices succeeding and one directly killed. - // because the job.kill() happens before the direct kill, the vertex has kill_trigger=DAG_KILL + // Vertex KILLED after a DAG_KILLED is issued. Termination reason should be DAG_KILLED @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testDAGKillPending() { @@ -1900,13 +1947,14 @@ public class TestDAGImpl { dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED)); - dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); for (int i = 2; i < 5; ++i) { dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED)); } dispatcher.await(); + dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.await(); Assert.assertEquals(DAGState.KILLED, dag.getState()); dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( @@ -1914,7 +1962,8 @@ public class TestDAGImpl { dispatcher.await(); Assert.assertEquals(DAGState.KILLED, dag.getState()); Assert.assertEquals(5, dag.getSuccessfulVertices()); - Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 5)).getTerminationCause(), VertexTerminationCause.DAG_KILL); + Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 5)).getTerminationCause(), + VertexTerminationCause.DAG_KILL); Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents); }
