Repository: tez
Updated Branches:
  refs/heads/master 608e15eee -> 8131896b3


TEZ-1248. Reduce slow-start should special case 1 reducer runs. (Zhiyuan Yang 
via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8131896b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8131896b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8131896b

Branch: refs/heads/master
Commit: 8131896b393995e4c9de955847a107c7f000c7fb
Parents: 608e15e
Author: Hitesh Shah <[email protected]>
Authored: Mon Jul 11 16:55:37 2016 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Mon Jul 11 16:55:37 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../vertexmanager/ShuffleVertexManager.java     |  6 ++-
 .../vertexmanager/TestShuffleVertexManager.java | 42 +++++++++++++++++---
 3 files changed, 44 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8131896b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dc1841c..7c1f0cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-1248. Reduce slow-start should special case 1 reducer runs.
   TEZ-3327. ATS Parser: Populate config details available in dag.
   TEZ-3325. Flaky test in TestDAGImpl.testCounterLimits.
   TEZ-3323. Update licese and notice for xml-apis jar. Also update year in 
notice to 2016.
@@ -78,6 +79,8 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-1248. Reduce slow-start should special case 1 reducer runs.
+
 Release 0.8.4: 2016-07-08
 
 INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/tez/blob/8131896b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index b83c64e..7d9822c 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -961,8 +961,10 @@ public class ShuffleVertexManager extends 
VertexManagerPlugin {
     
     tasksFractionToSchedule = Math.max(0, Math.min(1, 
tasksFractionToSchedule));
 
-    int numTasksToSchedule = 
-        ((int)(tasksFractionToSchedule * totalTasksToSchedule) - 
+    // round up to avoid the corner case that single task cannot be scheduled 
until src completed
+    // fraction reach max
+    int numTasksToSchedule =
+        ((int)(Math.ceil(tasksFractionToSchedule * totalTasksToSchedule)) -
          (totalTasksToSchedule - numPendingTasks));
     
     if (numTasksToSchedule > 0) {

http://git-wip-us.apache.org/repos/asf/tez/blob/8131896b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index df1f080..2566c94 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -408,8 +408,8 @@ public class TestShuffleVertexManager {
     verify(mockContext, times(3)).reconfigureVertex(anyInt(), 
any(VertexLocationHint.class), anyMap());
     verify(mockContext, times(1)).reconfigureVertex(eq(2), 
any(VertexLocationHint.class), anyMap());
     
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
-    Assert.assertEquals(1, manager.pendingTasks.size());
-    Assert.assertEquals(1, scheduledTasks.size());
+    Assert.assertEquals(0, manager.pendingTasks.size());
+    Assert.assertEquals(2, scheduledTasks.size());
     Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(3, manager.numVertexManagerEventsReceived);
     Assert.assertEquals(1202L, manager.completedSourceTasksOutputSize);
@@ -805,13 +805,13 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
     
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
     
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
-    Assert.assertTrue(manager.pendingTasks.size() == 2);
-    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.pendingTasks.size() == 1);
+    Assert.assertTrue(scheduledTasks.size() == 2); // 2 task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
     
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
     
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
     Assert.assertTrue(manager.pendingTasks.size() == 0);
-    Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 tasks scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
     scheduledTasks.clear();
     
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 
3)); // we are done. no action
@@ -845,6 +845,38 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 8);
 
+    // if there is single task to schedule, it should be schedule when src 
completed
+    // fraction is more than min slow start fraction
+    scheduledTasks.clear();
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(1);
+    manager = createManager(conf, mockContext, 0.25f, 0.75f);
+    manager.onVertexStarted(emptyCompletions);
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, 
VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, 
VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, 
VertexState.CONFIGURED));
+    Assert.assertTrue(manager.pendingTasks.size() == 1); // no tasks scheduled
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
+    
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+    Assert.assertTrue(manager.pendingTasks.size() == 1);
+    Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
+    
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+    Assert.assertTrue(manager.pendingTasks.size() == 0);
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+    scheduledTasks.clear();
+    
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+    
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
+    Assert.assertTrue(manager.pendingTasks.size() == 0);
+    Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
+    scheduledTasks.clear();
+    
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 
3)); // we are done. no action
+    Assert.assertTrue(manager.pendingTasks.size() == 0);
+    Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7);
   }
 
 

Reply via email to