Repository: tez Updated Branches: refs/heads/master 608e15eee -> 8131896b3
TEZ-1248. Reduce slow-start should special case 1 reducer runs. (Zhiyuan Yang via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8131896b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8131896b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8131896b Branch: refs/heads/master Commit: 8131896b393995e4c9de955847a107c7f000c7fb Parents: 608e15e Author: Hitesh Shah <[email protected]> Authored: Mon Jul 11 16:55:37 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Mon Jul 11 16:55:37 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 ++ .../vertexmanager/ShuffleVertexManager.java | 6 ++- .../vertexmanager/TestShuffleVertexManager.java | 42 +++++++++++++++++--- 3 files changed, 44 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8131896b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dc1841c..7c1f0cf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-1248. Reduce slow-start should special case 1 reducer runs. TEZ-3327. ATS Parser: Populate config details available in dag. TEZ-3325. Flaky test in TestDAGImpl.testCounterLimits. TEZ-3323. Update licese and notice for xml-apis jar. Also update year in notice to 2016. @@ -78,6 +79,8 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-1248. Reduce slow-start should special case 1 reducer runs. + Release 0.8.4: 2016-07-08 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/tez/blob/8131896b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index b83c64e..7d9822c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -961,8 +961,10 @@ public class ShuffleVertexManager extends VertexManagerPlugin { tasksFractionToSchedule = Math.max(0, Math.min(1, tasksFractionToSchedule)); - int numTasksToSchedule = - ((int)(tasksFractionToSchedule * totalTasksToSchedule) - + // round up to avoid the corner case that single task cannot be scheduled until src completed + // fraction reach max + int numTasksToSchedule = + ((int)(Math.ceil(tasksFractionToSchedule * totalTasksToSchedule)) - (totalTasksToSchedule - numPendingTasks)); if (numTasksToSchedule > 0) { http://git-wip-us.apache.org/repos/asf/tez/blob/8131896b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index df1f080..2566c94 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -408,8 +408,8 @@ public class TestShuffleVertexManager { verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - Assert.assertEquals(1, manager.pendingTasks.size()); - Assert.assertEquals(1, scheduledTasks.size()); + Assert.assertEquals(0, manager.pendingTasks.size()); + Assert.assertEquals(2, scheduledTasks.size()); Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted); Assert.assertEquals(3, manager.numVertexManagerEventsReceived); Assert.assertEquals(1202L, manager.completedSourceTasksOutputSize); @@ -805,13 +805,13 @@ public class TestShuffleVertexManager { Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 2); - Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled + Assert.assertTrue(manager.pendingTasks.size() == 1); + Assert.assertTrue(scheduledTasks.size() == 2); // 2 task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2)); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2)); Assert.assertTrue(manager.pendingTasks.size() == 0); - Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled + Assert.assertTrue(scheduledTasks.size() == 1); // 1 tasks scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6); scheduledTasks.clear(); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); // we are done. no action @@ -845,6 +845,38 @@ public class TestShuffleVertexManager { Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 8); + // if there is single task to schedule, it should be schedule when src completed + // fraction is more than min slow start fraction + scheduledTasks.clear(); + when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(1); + manager = createManager(conf, mockContext, 0.25f, 0.75f); + manager.onVertexStarted(emptyCompletions); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); + Assert.assertTrue(manager.pendingTasks.size() == 1); // no tasks scheduled + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); + Assert.assertTrue(manager.pendingTasks.size() == 1); + Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled + Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); + Assert.assertTrue(manager.pendingTasks.size() == 0); + Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled + Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); + scheduledTasks.clear(); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2)); + Assert.assertTrue(manager.pendingTasks.size() == 0); + Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled + Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6); + scheduledTasks.clear(); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); // we are done. no action + Assert.assertTrue(manager.pendingTasks.size() == 0); + Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled + Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7); }
