Repository: tez Updated Branches: refs/heads/master d9560b904 -> c4487f966
TEZ-2956. Handle auto-reduce parallelism when the totalNumBipartiteSourceTasks is 0 (Rajesh Balamohan and Bikas Saha) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c4487f96 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c4487f96 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c4487f96 Branch: refs/heads/master Commit: c4487f966a81c01ed061d502a397e2cf3b4bce44 Parents: d9560b9 Author: Bikas Saha <[email protected]> Authored: Tue Nov 24 18:48:23 2015 -0800 Committer: Bikas Saha <[email protected]> Committed: Tue Nov 24 18:48:23 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../vertexmanager/ShuffleVertexManager.java | 21 ++++++++++-------- .../vertexmanager/TestShuffleVertexManager.java | 23 ++++++++++++++------ 3 files changed, 30 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c4487f96/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 990c681..59847ef 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-2956. Handle auto-reduce parallelism when the + totalNumBipartiteSourceTasks is 0 TEZ-2947. Tez UI: Timeline, RM & AM requests gets into a consecutive loop in counters page without any delay TEZ-2946. Tez UI: At times RM return a huge error message making the yellow error bar to fill the whole screen TEZ-2949. Allow duplicate dag names within session for Tez. http://git-wip-us.apache.org/repos/asf/tez/blob/c4487f96/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 5fb4df9..f10c89a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -641,12 +641,10 @@ public class ShuffleVertexManager extends VertexManagerPlugin { */ @VisibleForTesting boolean determineParallelismAndApply() { - if(numBipartiteSourceTasksCompleted == 0) { - return true; - } - if(numVertexManagerEventsReceived == 0) { - return true; + if (totalNumBipartiteSourceTasks > 0) { + return true; + } } int currentParallelism = pendingTasks.size(); @@ -669,8 +667,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin { return false; } - long expectedTotalSourceTasksOutputSize = - (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived; + long expectedTotalSourceTasksOutputSize = 0; + if (numVertexManagerEventsReceived > 0 && totalNumBipartiteSourceTasks > 0 ) { + expectedTotalSourceTasksOutputSize = + (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived; + } int desiredTaskParallelism = (int)( @@ -770,8 +771,10 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } getContext().doneReconfiguringVertex(); } - //Sort in case partition stats are available - sortPendingTasksBasedOnDataSize(); + if (totalNumBipartiteSourceTasks > 0) { + //Sort in case partition stats are available + sortPendingTasksBasedOnDataSize(); + } List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasksToSchedule); while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) { http://git-wip-us.apache.org/repos/asf/tez/blob/c4487f96/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 965e99c..862e4df 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -186,7 +186,8 @@ public class TestShuffleVertexManager { doAnswer(new Answer() { public Object answer(InvocationOnMock invocation) throws Exception { - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2); + final int numTasks = ((Integer)invocation.getArguments()[0]).intValue(); + when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(numTasks); newEdgeManagers.clear(); for (Entry<String, EdgeProperty> entry : ((Map<String, EdgeProperty>)invocation.getArguments()[2]).entrySet()) { @@ -216,7 +217,7 @@ public class TestShuffleVertexManager { @Override public int getDestinationVertexNumTasks() { - return 2; + return numTasks; } }; EdgeManagerPlugin edgeManager = ReflectionUtils @@ -226,7 +227,7 @@ public class TestShuffleVertexManager { newEdgeManagers.put(entry.getKey(), edgeManager); } return null; - }}).when(mockContext).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); + }}).when(mockContext).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); // check initialization manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig @@ -244,12 +245,15 @@ public class TestShuffleVertexManager { // source vertices have 0 tasks. so only 1 notification needed. triggers scheduling manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.isEmpty()); + verify(mockContext, times(1)).reconfigureVertex(anyInt(), any + (VertexLocationHint.class), anyMap()); verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done - Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled + Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed scheduledTasks.clear(); // TODO TEZ-1714 locking verify(mockContext, times(1)).vertexManagerDone(); // notified after scheduling all tasks // check scheduling only after onVertexStarted + when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig verify(mockContext, times(3)).vertexReconfigurationPlanned(); // source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling @@ -260,13 +264,16 @@ public class TestShuffleVertexManager { // trigger start and processing of pending notification events manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.bipartiteSources == 2); + verify(mockContext, times(2)).reconfigureVertex(anyInt(), any + (VertexLocationHint.class), anyMap()); verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done Assert.assertTrue(manager.pendingTasks.isEmpty()); - Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled + Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); + when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex"); // parallelism not change due to large data size @@ -280,11 +287,13 @@ public class TestShuffleVertexManager { manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); - verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); + verify(mockContext, times(2)).reconfigureVertex(anyInt(), any + (VertexLocationHint.class), anyMap()); verify(mockContext, times(2)).doneReconfiguringVertex(); // trigger scheduling manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); + verify(mockContext, times(2)).reconfigureVertex(anyInt(), any + (VertexLocationHint.class), anyMap()); verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled Assert.assertEquals(4, scheduledTasks.size());
