Repository: tez
Updated Branches:
refs/heads/branch-0.7 4d44c5854 -> 1db5d1878
TEZ-2956. Handle auto-reduce parallelism when the totalNumBipartiteSourceTasks
is 0 (Rajesh Balamohan and Bikas Saha)
(cherry picked from commit c4487f966a81c01ed061d502a397e2cf3b4bce44)
Conflicts:
CHANGES.txt
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ff7930c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ff7930c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ff7930c
Branch: refs/heads/branch-0.7
Commit: 4ff7930c049421f4b46d017f73b90c3e8cde4efc
Parents: 4d44c58
Author: Bikas Saha <[email protected]>
Authored: Tue Nov 24 18:48:23 2015 -0800
Committer: Bikas Saha <[email protected]>
Committed: Fri Dec 11 18:06:35 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../vertexmanager/ShuffleVertexManager.java | 15 +++++++------
.../vertexmanager/TestShuffleVertexManager.java | 23 ++++++++++++++------
3 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4ff7930c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 04076ae..fd2749e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-2956. Handle auto-reduce parallelism when the
+ totalNumBipartiteSourceTasks is 0
TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts
instead of tasks
TEZ-2824. Add javadocs for Vertex.setConf and DAG.setConf.
http://git-wip-us.apache.org/repos/asf/tez/blob/4ff7930c/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 308579b..9fb5d1e 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -570,12 +570,10 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
*/
@VisibleForTesting
boolean determineParallelismAndApply() {
- if(numBipartiteSourceTasksCompleted == 0) {
- return true;
- }
-
if(numVertexManagerEventsReceived == 0) {
- return true;
+ if (totalNumBipartiteSourceTasks > 0) {
+ return true;
+ }
}
int currentParallelism = pendingTasks.size();
@@ -598,8 +596,11 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
return false;
}
- long expectedTotalSourceTasksOutputSize =
- (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) /
numVertexManagerEventsReceived;
+ long expectedTotalSourceTasksOutputSize = 0;
+ if (numVertexManagerEventsReceived > 0 && totalNumBipartiteSourceTasks > 0
) {
+ expectedTotalSourceTasksOutputSize =
+ (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) /
numVertexManagerEventsReceived;
+ }
int desiredTaskParallelism =
(int)(
http://git-wip-us.apache.org/repos/asf/tez/blob/4ff7930c/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index df08060..f3f3444 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -180,7 +180,8 @@ public class TestShuffleVertexManager {
doAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) throws Exception {
-
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
+ final int numTasks =
((Integer)invocation.getArguments()[0]).intValue();
+
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(numTasks);
newEdgeManagers.clear();
for (Entry<String, EdgeProperty> entry :
((Map<String,
EdgeProperty>)invocation.getArguments()[2]).entrySet()) {
@@ -210,7 +211,7 @@ public class TestShuffleVertexManager {
@Override
public int getDestinationVertexNumTasks() {
- return 2;
+ return numTasks;
}
};
EdgeManagerPlugin edgeManager = ReflectionUtils
@@ -220,7 +221,7 @@ public class TestShuffleVertexManager {
newEdgeManagers.put(entry.getKey(), edgeManager);
}
return null;
- }}).when(mockContext).reconfigureVertex(eq(2),
any(VertexLocationHint.class), anyMap());
+ }}).when(mockContext).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
// check initialization
manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of
reconfig
@@ -238,12 +239,15 @@ public class TestShuffleVertexManager {
// source vertices have 0 tasks. so only 1 notification needed. triggers
scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.isEmpty());
+ verify(mockContext, times(1)).reconfigureVertex(anyInt(), any
+ (VertexLocationHint.class), anyMap());
verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
- Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
+ Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and
parallelism changed
scheduledTasks.clear();
// TODO TEZ-1714 locking verify(mockContext,
times(1)).vertexManagerDone(); // notified after scheduling all tasks
// check scheduling only after onVertexStarted
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of
reconfig
verify(mockContext, times(3)).vertexReconfigurationPlanned();
// source vertices have 0 tasks. so only 1 notification needed. does not
trigger scheduling
@@ -254,13 +258,16 @@ public class TestShuffleVertexManager {
// trigger start and processing of pending notification events
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.bipartiteSources == 2);
+ verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+ (VertexLocationHint.class), anyMap());
verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
Assert.assertTrue(manager.pendingTasks.isEmpty());
- Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
+ Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and
parallelism changed
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
// parallelism not change due to large data size
@@ -274,11 +281,13 @@ public class TestShuffleVertexManager {
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
- verify(mockContext, times(0)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+ (VertexLocationHint.class), anyMap());
verify(mockContext, times(2)).doneReconfiguringVertex();
// trigger scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- verify(mockContext, times(0)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+ (VertexLocationHint.class), anyMap());
verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done
Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
Assert.assertEquals(4, scheduledTasks.size());