Repository: tez
Updated Branches:
refs/heads/branch-0.7 4a7441731 -> 06da29aec
TEZ-2857. Fix flakey tests in TestDAGImpl. (sseth)
(cherry picked from commit d63d6ee600464662670058485492ec56ae13cffe)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bb063c6a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bb063c6a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bb063c6a
Branch: refs/heads/branch-0.7
Commit: bb063c6ade787eec84361b28c2695e9b2bfbfad2
Parents: 4a74417
Author: Siddharth Seth <[email protected]>
Authored: Mon Sep 28 17:47:50 2015 -0700
Committer: Siddharth Seth <[email protected]>
Committed: Mon Sep 28 17:51:50 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/impl/TestDAGImpl.java | 99 +++++++++++++++-----
2 files changed, 75 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/bb063c6a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d51eed..1c17702 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES
+ TEZ-2857. Fix flakey tests in TestDAGImpl.
TEZ-2398. Flaky test: TestFaultTolerance
TEZ-2808. Race condition between preemption and container assignment
TEZ-2853. Tez UI: task attempt page is coming empty
http://git-wip-us.apache.org/repos/asf/tez/blob/bb063c6a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index e69db0f..49f534b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -1007,7 +1007,7 @@ public class TestDAGImpl {
dispatcher.await();
Assert.assertEquals(DAGState.FAILED, dag.getState());
Assert.assertEquals(DAGTerminationCause.INIT_FAILURE,
dag.getTerminationCause());
- Assert.assertTrue(StringUtils.join(dag.getDiagnostics(),",")
+ Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), ",")
.contains("Vertex's TaskResource is beyond the cluster container
capability"));
}
@@ -1093,7 +1093,7 @@ public class TestDAGImpl {
Assert.assertEquals(1, dag.getSuccessfulVertices());
// 2 tasks completed, total plan has 11 vertices
- Assert.assertEquals((float)2/11,
+ Assert.assertEquals((float) 2 / 11,
dag.getCompletedTaskProgress(), 0.05);
}
@@ -1168,7 +1168,7 @@ public class TestDAGImpl {
setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination,
true);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
- dispatcher.getEventHandler().handle(new
DAGEventStartDag(dagWithCustomEdge.getID(),
+ dispatcher.getEventHandler().handle(new
DAGEventStartDag(dagWithCustomEdge.getID(),
null));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
@@ -1183,7 +1183,8 @@ public class TestDAGImpl {
DataMovementEvent daEvent = DataMovementEvent.create(ByteBuffer.wrap(new
byte[0]));
TezEvent tezEvent = new TezEvent(daEvent,
new EventMetaData(EventProducerConsumerType.INPUT, "vertex1",
"vertex2", ta1.getID()));
- dispatcher.getEventHandler().handle(new
VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
+ dispatcher.getEventHandler().handle(
+ new VertexEventRouteEvent(v2.getVertexId(),
Lists.newArrayList(tezEvent)));
dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v2.getState());
@@ -1230,7 +1231,7 @@ public class TestDAGImpl {
setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationConsumerTasks);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
- dispatcher.getEventHandler().handle(new
DAGEventStartDag(dagWithCustomEdge.getID(),
+ dispatcher.getEventHandler().handle(new
DAGEventStartDag(dagWithCustomEdge.getID(),
null));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
@@ -1245,7 +1246,8 @@ public class TestDAGImpl {
InputReadErrorEvent ireEvent = InputReadErrorEvent.create("", 0, 0);
TezEvent tezEvent = new TezEvent(ireEvent,
new EventMetaData(EventProducerConsumerType.INPUT,"vertex2",
"vertex1", ta1.getID()));
- dispatcher.getEventHandler().handle(new
VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
+ dispatcher.getEventHandler().handle(
+ new VertexEventRouteEvent(v2.getVertexId(),
Lists.newArrayList(tezEvent)));
dispatcher.await();
//
Assert.assertEquals(VertexState.FAILED, v2.getState());
@@ -1318,12 +1320,14 @@ public class TestDAGImpl {
Vertex v2 = groupDag.getVertex("vertex2");
dispatcher.getEventHandler().handle(new
DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new
DAGEventVertexReRunning(v1.getVertexId()));
- dispatcher.getEventHandler().handle(new
DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
+ dispatcher.getEventHandler().handle(
+ new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
dispatcher.await();
// commit should not happen due to vertex-rerunning
Assert.assertEquals(0, TotalCountingOutputCommitter.totalCommitCounter);
- dispatcher.getEventHandler().handle(new
DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+ dispatcher.getEventHandler().handle(
+ new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
dispatcher.await();
// commit happen
Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
@@ -1417,10 +1421,10 @@ public class TestDAGImpl {
.newBuilder()
.setClassName(CountingOutputCommitter.class.getName())
.setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
- .setUserPayload(
- ByteString
- .copyFrom(new
CountingOutputCommitter.CountingOutputCommitterConfig(
- true, false, false).toUserPayload())).build()))
+ .setUserPayload(
+ ByteString
+ .copyFrom(new
CountingOutputCommitter.CountingOutputCommitterConfig(
+ true, false, false).toUserPayload())).build()))
.setName("output3")
.setIODescriptor(
TezEntityDescriptorProto.newBuilder().setClassName("output.class")
@@ -1848,13 +1852,8 @@ public class TestDAGImpl {
}
}
- // a dag.kill() on an active DAG races with vertices all succeeding.
- // if a JOB_KILL is processed while dag is in running state, it should end
in KILLED,
- // regardless of whether all vertices complete
- //
- // Final state:
- // DAG is in KILLED state, with killTrigger = USER_KILL
- // Each vertex had kill triggered but raced ahead and ends in SUCCEEDED
state.
+ // Couple of vertices succeed. DAG_KILLED processed, which causes the rest
of the vertices to be
+ // marked as KILLED.
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDAGKill() {
@@ -1870,22 +1869,70 @@ public class TestDAGImpl {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEvent(dagId,
DAGEventType.DAG_KILL));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.KILLED, dag.getState());
+ Assert.assertEquals(DAGTerminationCause.DAG_KILL,
dag.getTerminationCause());
+ Assert.assertEquals(2, dag.getSuccessfulVertices());
+
+ int killedCount = 0;
+ for (Map.Entry<TezVertexID, Vertex> vEntry : dag.getVertices().entrySet())
{
+ if (vEntry.getValue().getState() == VertexState.KILLED) {
+ killedCount++;
+ }
+ }
+ Assert.assertEquals(4, killedCount);
+
+ for (Vertex v : dag.getVertices().values()) {
+ Assert.assertEquals(VertexTerminationCause.DAG_KILL,
v.getTerminationCause());
+ }
+
+ Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
+ }
+
+ // Vertices succeed after a DAG kill has been processed. Should be ignored.
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testDAGKillVertexSuccessAfterKill() {
+ initDAG(dag);
+ startDAG(dag);
+ dispatcher.await();
+
+ dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
+ TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+ dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
+ TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
+ dispatcher.getEventHandler().handle(new DAGEvent(dagId,
DAGEventType.DAG_KILL));
+ dispatcher.await();
+
+ Assert.assertEquals(DAGState.KILLED, dag.getState());
+
+ // Vertex SUCCESS gets processed after the DAG has reached the KILLED
state. Should be ignored.
for (int i = 2; i < 6; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
}
dispatcher.await();
- Assert.assertEquals(DAGState.KILLED, dag.getState());
+
+ int killedCount = 0;
+ for (Map.Entry<TezVertexID, Vertex> vEntry : dag.getVertices().entrySet())
{
+ if (vEntry.getValue().getState() == VertexState.KILLED) {
+ killedCount++;
+ }
+ }
+ Assert.assertEquals(4, killedCount);
+
Assert.assertEquals(DAGTerminationCause.DAG_KILL,
dag.getTerminationCause());
- Assert.assertEquals(6, dag.getSuccessfulVertices());
+ Assert.assertEquals(2, dag.getSuccessfulVertices());
for (Vertex v : dag.getVertices().values()) {
Assert.assertEquals(VertexTerminationCause.DAG_KILL,
v.getTerminationCause());
}
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}
- // job kill races with most vertices succeeding and one directly killed.
- // because the job.kill() happens before the direct kill, the vertex has
kill_trigger=DAG_KILL
+ // Vertex KILLED after a DAG_KILLED is issued. Termination reason should be
DAG_KILLED
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDAGKillPending() {
@@ -1900,13 +1947,14 @@ public class TestDAGImpl {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
- dispatcher.getEventHandler().handle(new DAGEvent(dagId,
DAGEventType.DAG_KILL));
for (int i = 2; i < 5; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
}
dispatcher.await();
+ dispatcher.getEventHandler().handle(new DAGEvent(dagId,
DAGEventType.DAG_KILL));
+ dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
@@ -1914,7 +1962,8 @@ public class TestDAGImpl {
dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());
Assert.assertEquals(5, dag.getSuccessfulVertices());
- Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId,
5)).getTerminationCause(), VertexTerminationCause.DAG_KILL);
+ Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId,
5)).getTerminationCause(),
+ VertexTerminationCause.DAG_KILL);
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}