Repository: tez Updated Branches: refs/heads/master f8e014876 -> 7221d386a
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7221d386 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7221d386 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7221d386 Branch: refs/heads/master Commit: 7221d386a4fbc1f32aae1854bd25defb4c6d557a Parents: f8e0148 Author: Jason Lowe <[email protected]> Authored: Thu Apr 28 13:55:49 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Thu Apr 28 13:55:49 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 3 ++ .../vertexmanager/ShuffleVertexManager.java | 7 +-- .../vertexmanager/TestShuffleVertexManager.java | 52 ++++++++++++++++++++ 3 files changed, 57 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7221d386/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 324ca38..3979725 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.9.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks TEZ-3207. Add support for fetching multiple partitions from the same source task to UnorderedKVInput. TEZ-3232. Disable randomFailingInputs in testFaulttolerance to unblock other tests. TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts. @@ -25,6 +26,7 @@ Release 0.8.4: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts. TEZ-3224. User payload is not initialized before creating vertex manager plugin. TEZ-3226. Tez UI 2: All DAGs UX improvements. @@ -462,6 +464,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks TEZ-3224. User payload is not initialized before creating vertex manager plugin. TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs TEZ-3202. Reduce the memory need for jobs with high number of segments http://git-wip-us.apache.org/repos/asf/tez/blob/7221d386/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 47fc60f..aee8b6f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -909,10 +909,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin { // vertex not started yet return; } - int numPendingTasks = pendingTasks.size(); - if (numPendingTasks == 0) { - return; - } if (!sourceVerticesScheduled && !canScheduleTasks()) { if (LOG.isDebugEnabled()) { @@ -922,7 +918,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin { return; } - if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks && numPendingTasks > 0) { + int numPendingTasks = pendingTasks.size(); + if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks) { LOG.info("All source tasks assigned. " + "Ramping up " + numPendingTasks + " remaining tasks for vertex: " + getContext().getVertexName()); http://git-wip-us.apache.org/repos/asf/tez/blob/7221d386/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 9c21aed..df1f080 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -1281,6 +1281,58 @@ public class TestShuffleVertexManager { Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled Assert.assertTrue(scheduledTasks.size() == 3); } + + @Test + public void testZeroTasksSendsConfigured() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, + true); + conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L); + ShuffleVertexManager manager = null; + + HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>(); + String r1 = "R1"; + EdgeProperty eProp1 = EdgeProperty.create( + EdgeProperty.DataMovementType.SCATTER_GATHER, + EdgeProperty.DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, + OutputDescriptor.create("out"), + InputDescriptor.create("in")); + + final String mockManagedVertexId = "R2"; + mockInputVertices.put(r1, eProp1); + + final VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); + when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); + when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); + when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(0); + + VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1); + // check initialization + manager = createManager(conf, mockContext, 0.001f, 0.001f); + + final HashSet<Integer> scheduledTasks = new HashSet<Integer>(); + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + scheduledTasks.clear(); + List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0]; + for (ScheduleTaskRequest task : tasks) { + scheduledTasks.add(task.getTaskIndex()); + } + return null; + }}).when(mockContext).scheduleTasks(anyList()); + + manager.onVertexStarted(emptyCompletions); + manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); + Assert.assertEquals(1, manager.bipartiteSources); + Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); + Assert.assertEquals(0, manager.totalNumBipartiteSourceTasks); + Assert.assertEquals(0, manager.pendingTasks.size()); // no tasks scheduled + Assert.assertEquals(0, scheduledTasks.size()); + verify(mockContext).doneReconfiguringVertex(); + } public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) { VertexIdentifier mockVertex = mock(VertexIdentifier.class);
