TEZ-3792. RootInputVertexManager doesn't drain queued source task completed events (Eric Badger via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6777707f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6777707f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6777707f Branch: refs/heads/branch-0.9.0 Commit: 6777707fb2a93ea8a8ffa05f290d9555e77d4dc6 Parents: 07d9146 Author: Jonathan Eagles <[email protected]> Authored: Fri Jul 14 10:42:23 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Fri Jul 14 10:42:23 2017 -0500 ---------------------------------------------------------------------- .../app/dag/impl/RootInputVertexManager.java | 5 +++ .../dag/impl/TestRootInputVertexManager.java | 33 ++++++++++++++++++++ .../TestShuffleVertexManagerBase.java | 29 ++++++++++++++++- 3 files changed, 66 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6777707f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java index 3205983..38eba0e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java @@ -171,6 +171,11 @@ public class RootInputVertexManager extends VertexManagerPlugin { + srcVertex + " as it has " + numTasks + " tasks"); } } + if (completions != null) { + for (TaskAttemptIdentifier attempt : completions) { + onSourceTaskCompleted(attempt); + } + } onVertexStartedDone.set(true); // track the tasks in this vertex updatePendingTasks(); http://git-wip-us.apache.org/repos/asf/tez/blob/6777707f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java index 50bac69..16a97d4 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -491,6 +492,38 @@ public class TestRootInputVertexManager { Assert.assertEquals(manager.numSourceTasksCompleted, 7); } + @Test + public void testTezDrainCompletionsOnVertexStart() throws IOException { + Configuration conf = new Configuration(); + RootInputVertexManager manager = null; + HashMap<String, EdgeProperty> mockInputVertices = + new HashMap<String, EdgeProperty>(); + String mockSrcVertexId1 = "Vertex1"; + EdgeProperty eProp1 = EdgeProperty.create( + EdgeProperty.DataMovementType.BROADCAST, + EdgeProperty.DataSourceType.PERSISTED, + EdgeProperty.SchedulingType.SEQUENTIAL, + OutputDescriptor.create("out"), + InputDescriptor.create("in")); + + VertexManagerPluginContext mockContext = + mock(VertexManagerPluginContext.class); + when(mockContext.getVertexStatistics(any(String.class))) + .thenReturn(mock(VertexStatistics.class)); + when(mockContext.getInputVertexEdgeProperties()) + .thenReturn(mockInputVertices); + when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3); + + mockInputVertices.put(mockSrcVertexId1, eProp1); + + // check initialization + manager = createRootInputVertexManager(conf, mockContext, 0.1f, 0.1f); + Assert.assertEquals(0, manager.numSourceTasksCompleted); + manager.onVertexStarted(Collections.singletonList( + createTaskAttemptIdentifier(mockSrcVertexId1, 0))); + Assert.assertEquals(1, manager.numSourceTasksCompleted); + } + static RootInputVertexManager createRootInputVertexManager( Configuration conf, VertexManagerPluginContext context, Float min, http://git-wip-us.apache.org/repos/asf/tez/blob/6777707f/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java index 2e97381..96f46d6 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java @@ -44,6 +44,7 @@ import org.junit.runners.Parameterized; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -1106,7 +1107,33 @@ public class TestShuffleVertexManagerBase extends TestShuffleVertexManagerUtils Assert.assertEquals(0, scheduledTasks.size()); verify(mockContext).doneReconfiguringVertex(); } - + + + @Test(timeout=5000) + public void testTezDrainCompletionsOnVertexStart() throws IOException { + Configuration conf = new Configuration(); + ShuffleVertexManagerBase manager; + + final String mockSrcVertexId1 = "Vertex1"; + final String mockSrcVertexId2 = "Vertex2"; + final String mockSrcVertexId3 = "Vertex3"; + final String mockManagedVertexId = "Vertex4"; + + final List<Integer> scheduledTasks = Lists.newLinkedList(); + + final VertexManagerPluginContext mockContext = createVertexManagerContext( + mockSrcVertexId1, 2, mockSrcVertexId2, 2, mockSrcVertexId3, 2, + mockManagedVertexId, 4, scheduledTasks, null); + + //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself. + manager = createManager(conf, mockContext, 0.01f, 0.75f); + Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); + manager.onVertexStarted(Collections.singletonList( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0))); + Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); + + } + private ShuffleVertexManagerBase createManager(Configuration conf, VertexManagerPluginContext context, Float min, Float max) { return createManager(this.shuffleVertexManagerClass, conf, context, true,
