Repository: tez Updated Branches: refs/heads/branch-0.7 1b3fbe12c -> 083462d0b
TEZ-3126. Log reason for not reducing parallelism (jeagles) (cherry picked from commit fd75e640396da8d5e1c67ef554d5db1846e08c69) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/083462d0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/083462d0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/083462d0 Branch: refs/heads/branch-0.7 Commit: 083462d0b65fa38e634103bb6a5726762fc4e8e3 Parents: 1b3fbe1 Author: Jonathan Eagles <[email protected]> Authored: Mon Feb 22 22:45:23 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Mon Feb 22 22:49:12 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../vertexmanager/ShuffleVertexManager.java | 28 ++++++++++++-------- 2 files changed, 18 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/083462d0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 35f773f..3efe9fb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES + TEZ-3126. Log reason for not reducing parallelism TEZ-3123. Containers can get re-used even with conflicting local resources. TEZ-3117. Deadlock in Edge and Vertex code TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime). http://git-wip-us.apache.org/repos/asf/tez/blob/083462d0/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 410ad73..b11ed3f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -706,6 +706,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } } + LOG.info("Expected output: " + expectedTotalSourceTasksOutputSize + " based on actual output: " + + completedSourceTasksOutputSize + " from " + numVertexManagerEventsReceived + " vertex manager events. " + + " desiredTaskInputSize: " + desiredTaskInputDataSize + " max slow start tasks:" + + (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction) + " num sources completed:" + + numBipartiteSourceTasksCompleted); + int desiredTaskParallelism = (int)( (expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/ @@ -713,16 +719,22 @@ public class ShuffleVertexManager extends VertexManagerPlugin { if(desiredTaskParallelism < minTaskParallelism) { desiredTaskParallelism = minTaskParallelism; } - + if(desiredTaskParallelism >= currentParallelism) { + LOG.info("Not reducing auto parallelism for vertex: " + getContext().getVertexName() + + " since the desired parallelism of " + desiredTaskParallelism + + " is greater than or equal to the current parallelism of " + pendingTasks.size()); return true; } - + // most shufflers will be assigned this range basePartitionRange = currentParallelism/desiredTaskParallelism; if (basePartitionRange <= 1) { // nothing to do if range is equal 1 partition. shuffler does it by default + LOG.info("Not reducing auto parallelism for vertex: " + getContext().getVertexName() + + " by less than half since combining two inputs will potentially break the desired task input size of " + + desiredTaskInputDataSize); return true; } @@ -732,15 +744,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin { int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ? (numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange); - LOG.info("Reduce auto parallelism for vertex: " + getContext().getVertexName() - + " to " + finalTaskParallelism + " from " + pendingTasks.size() - + " . Expected output: " + expectedTotalSourceTasksOutputSize - + " based on actual output: " + completedSourceTasksOutputSize - + " from " + numVertexManagerEventsReceived + " vertex manager events. " - + " desiredTaskInputSize: " + desiredTaskInputDataSize + " max slow start tasks:" + - (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction) + " num sources completed:" + - numBipartiteSourceTasksCompleted); - + LOG.info("Reducing auto parallelism for vertex: " + getContext().getVertexName() + + " from " + pendingTasks.size() + " to " + finalTaskParallelism); + if(finalTaskParallelism < currentParallelism) { // final parallelism is less than actual parallelism Map<String, EdgeProperty> edgeProperties =
