Repository: tez
Updated Branches:
  refs/heads/branch-0.7 4d44c5854 -> 1db5d1878


TEZ-2956. Handle auto-reduce parallelism when the totalNumBipartiteSourceTasks 
is 0 (Rajesh Balamohan and Bikas Saha)
(cherry picked from commit c4487f966a81c01ed061d502a397e2cf3b4bce44)

Conflicts:
        CHANGES.txt
        
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ff7930c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ff7930c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ff7930c

Branch: refs/heads/branch-0.7
Commit: 4ff7930c049421f4b46d017f73b90c3e8cde4efc
Parents: 4d44c58
Author: Bikas Saha <[email protected]>
Authored: Tue Nov 24 18:48:23 2015 -0800
Committer: Bikas Saha <[email protected]>
Committed: Fri Dec 11 18:06:35 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../vertexmanager/ShuffleVertexManager.java     | 15 +++++++------
 .../vertexmanager/TestShuffleVertexManager.java | 23 ++++++++++++++------
 3 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4ff7930c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 04076ae..fd2749e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-2956. Handle auto-reduce parallelism when the
+  totalNumBipartiteSourceTasks is 0
   TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts
   instead of tasks
   TEZ-2824. Add javadocs for Vertex.setConf and DAG.setConf.

http://git-wip-us.apache.org/repos/asf/tez/blob/4ff7930c/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 308579b..9fb5d1e 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -570,12 +570,10 @@ public class ShuffleVertexManager extends 
VertexManagerPlugin {
    */
   @VisibleForTesting
   boolean determineParallelismAndApply() {
-    if(numBipartiteSourceTasksCompleted == 0) {
-      return true;
-    }
-    
     if(numVertexManagerEventsReceived == 0) {
-      return true;
+      if (totalNumBipartiteSourceTasks > 0) {
+        return true;
+      }
     }
     
     int currentParallelism = pendingTasks.size();
@@ -598,8 +596,11 @@ public class ShuffleVertexManager extends 
VertexManagerPlugin {
       return false;
     }
 
-    long expectedTotalSourceTasksOutputSize =
-        (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / 
numVertexManagerEventsReceived;
+    long expectedTotalSourceTasksOutputSize = 0;
+    if (numVertexManagerEventsReceived > 0 && totalNumBipartiteSourceTasks > 0 
) {
+      expectedTotalSourceTasksOutputSize =
+          (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / 
numVertexManagerEventsReceived;
+    }
 
     int desiredTaskParallelism = 
         (int)(

http://git-wip-us.apache.org/repos/asf/tez/blob/4ff7930c/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index df08060..f3f3444 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -180,7 +180,8 @@ public class TestShuffleVertexManager {
 
     doAnswer(new Answer() {
       public Object answer(InvocationOnMock invocation) throws Exception {
-          
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
+          final int numTasks = 
((Integer)invocation.getArguments()[0]).intValue();
+          
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(numTasks);
           newEdgeManagers.clear();
           for (Entry<String, EdgeProperty> entry :
               ((Map<String, 
EdgeProperty>)invocation.getArguments()[2]).entrySet()) {
@@ -210,7 +211,7 @@ public class TestShuffleVertexManager {
 
               @Override
               public int getDestinationVertexNumTasks() {
-                return 2;
+                return numTasks;
               }
             };
             EdgeManagerPlugin edgeManager = ReflectionUtils
@@ -220,7 +221,7 @@ public class TestShuffleVertexManager {
             newEdgeManagers.put(entry.getKey(), edgeManager);
           }
           return null;
-      }}).when(mockContext).reconfigureVertex(eq(2), 
any(VertexLocationHint.class), anyMap());
+      }}).when(mockContext).reconfigureVertex(anyInt(), 
any(VertexLocationHint.class), anyMap());
     
     // check initialization
     manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of 
reconfig
@@ -238,12 +239,15 @@ public class TestShuffleVertexManager {
     // source vertices have 0 tasks. so only 1 notification needed. triggers 
scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, 
VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
+    verify(mockContext, times(1)).reconfigureVertex(anyInt(), any
+        (VertexLocationHint.class), anyMap());
     verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
-    Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
+    Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and 
parallelism changed
     scheduledTasks.clear();
     // TODO TEZ-1714 locking verify(mockContext, 
times(1)).vertexManagerDone(); // notified after scheduling all tasks
 
     // check scheduling only after onVertexStarted
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
     manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of 
reconfig
     verify(mockContext, times(3)).vertexReconfigurationPlanned();
     // source vertices have 0 tasks. so only 1 notification needed. does not 
trigger scheduling
@@ -254,13 +258,16 @@ public class TestShuffleVertexManager {
     // trigger start and processing of pending notification events
     manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 2);
+    verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+        (VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
     Assert.assertTrue(manager.pendingTasks.isEmpty());
-    Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
+    Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and 
parallelism changed
 
     
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
 
     VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
     // parallelism not change due to large data size
@@ -274,11 +281,13 @@ public class TestShuffleVertexManager {
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, 
VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, 
VertexState.CONFIGURED));
     
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
-    verify(mockContext, times(0)).reconfigureVertex(anyInt(), 
any(VertexLocationHint.class), anyMap());
+    verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+        (VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex();
     // trigger scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, 
VertexState.CONFIGURED));
-    verify(mockContext, times(0)).reconfigureVertex(anyInt(), 
any(VertexLocationHint.class), anyMap());
+    verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+        (VertexLocationHint.class), anyMap());
     verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
     Assert.assertEquals(4, scheduledTasks.size());

Reply via email to