Repository: tez
Updated Branches:
refs/heads/branch-0.8 e25cb30ad -> 68e016f77
TEZ-1248. Reduce slow-start should special case 1 reducer runs. (Zhiyuan Yang
via hitesh)
(cherry picked from commit 8131896b393995e4c9de955847a107c7f000c7fb)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/68e016f7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/68e016f7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/68e016f7
Branch: refs/heads/branch-0.8
Commit: 68e016f77a7e4537d4509dfbca1de16954abe998
Parents: e25cb30
Author: Hitesh Shah <[email protected]>
Authored: Mon Jul 11 16:55:37 2016 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Mon Jul 11 16:56:33 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../vertexmanager/ShuffleVertexManager.java | 6 ++-
.../vertexmanager/TestShuffleVertexManager.java | 42 +++++++++++++++++---
3 files changed, 42 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/68e016f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ee8502c..5ddc8e6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1248. Reduce slow-start should special case 1 reducer runs.
Release 0.8.4: 2016-07-08
http://git-wip-us.apache.org/repos/asf/tez/blob/68e016f7/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index aee8b6f..6104a1d 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -961,8 +961,10 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
tasksFractionToSchedule = Math.max(0, Math.min(1,
tasksFractionToSchedule));
- int numTasksToSchedule =
- ((int)(tasksFractionToSchedule * totalTasksToSchedule) -
+ // round up to avoid the corner case that single task cannot be scheduled
until src completed
+ // fraction reach max
+ int numTasksToSchedule =
+ ((int)(Math.ceil(tasksFractionToSchedule * totalTasksToSchedule)) -
(totalTasksToSchedule - numPendingTasks));
if (numTasksToSchedule > 0) {
http://git-wip-us.apache.org/repos/asf/tez/blob/68e016f7/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index df1f080..2566c94 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -408,8 +408,8 @@ public class TestShuffleVertexManager {
verify(mockContext, times(3)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
verify(mockContext, times(1)).reconfigureVertex(eq(2),
any(VertexLocationHint.class), anyMap());
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- Assert.assertEquals(1, manager.pendingTasks.size());
- Assert.assertEquals(1, scheduledTasks.size());
+ Assert.assertEquals(0, manager.pendingTasks.size());
+ Assert.assertEquals(2, scheduledTasks.size());
Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(3, manager.numVertexManagerEventsReceived);
Assert.assertEquals(1202L, manager.completedSourceTasksOutputSize);
@@ -805,13 +805,13 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 2);
- Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+ Assert.assertTrue(manager.pendingTasks.size() == 1);
+ Assert.assertTrue(scheduledTasks.size() == 2); // 2 task scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
Assert.assertTrue(manager.pendingTasks.size() == 0);
- Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
+ Assert.assertTrue(scheduledTasks.size() == 1); // 1 tasks scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
scheduledTasks.clear();
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1,
3)); // we are done. no action
@@ -845,6 +845,38 @@ public class TestShuffleVertexManager {
Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 8);
+ // if there is single task to schedule, it should be schedule when src
completed
+ // fraction is more than min slow start fraction
+ scheduledTasks.clear();
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(1);
+ manager = createManager(conf, mockContext, 0.25f, 0.75f);
+ manager.onVertexStarted(emptyCompletions);
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
+ Assert.assertTrue(manager.pendingTasks.size() == 1); // no tasks scheduled
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+ Assert.assertTrue(manager.pendingTasks.size() == 1);
+ Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 0);
+ Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ scheduledTasks.clear();
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
+ Assert.assertTrue(manager.pendingTasks.size() == 0);
+ Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
+ scheduledTasks.clear();
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1,
3)); // we are done. no action
+ Assert.assertTrue(manager.pendingTasks.size() == 0);
+ Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7);
}