Repository: tez Updated Branches: refs/heads/branch-0.7 b47b405d6 -> f442c63f5
TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs (jeagles) (cherry picked from commit ed03611245423c89a9881af8bdc85ab909992a5d) (cherry picked from commit 7f8687f249e63b523d4260006cee1b5223828805) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f442c63f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f442c63f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f442c63f Branch: refs/heads/branch-0.7 Commit: f442c63f58ff2828c85374f94b74a42dc73d532e Parents: b47b405 Author: Jonathan Eagles <[email protected]> Authored: Wed Oct 19 16:56:26 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu Oct 20 11:17:00 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../vertexmanager/ShuffleVertexManager.java | 47 ++++++---- .../vertexmanager/TestShuffleVertexManager.java | 94 +++++++++++++++++++- 3 files changed, 123 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f442c63f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e82e833..97b6292 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second. TEZ-3464. Fix findbugs warnings in tez-dag mainLoop TEZ-3335. DAG client thinks app is still running when app status is null http://git-wip-us.apache.org/repos/asf/tez/blob/f442c63f/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 3b2b669..462e28f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -65,6 +65,7 @@ import javax.annotation.Nullable; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.BitSet; @@ -696,28 +697,44 @@ public class ShuffleVertexManager extends VertexManagerPlugin { // Change this to use per partition stats for more accuracy TEZ-2962. // Instead of aggregating overall size and then dividing equally - coalesce partitions until // desired per partition size is achieved. - long expectedTotalSourceTasksOutputSize = 0; + BigInteger expectedTotalSourceTasksOutputSize = BigInteger.ZERO; for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) { SourceVertexInfo srcInfo = vInfo.getValue(); if (srcInfo.numTasks > 0 && srcInfo.numVMEventsReceived > 0) { // this assumes that 1 vmEvent is received per completed task - TEZ-2961 - expectedTotalSourceTasksOutputSize += - (srcInfo.numTasks * srcInfo.outputSize) / srcInfo.numVMEventsReceived; + // Estimate total size by projecting based on the current average size per event + BigInteger srcOutputSize = BigInteger.valueOf(srcInfo.outputSize); + BigInteger srcNumTasks = BigInteger.valueOf(srcInfo.numTasks); + BigInteger srcNumVMEventsReceived = BigInteger.valueOf(srcInfo.numVMEventsReceived); + BigInteger expectedSrcOutputSize = srcOutputSize.multiply(srcNumTasks).divide(srcNumVMEventsReceived); + expectedTotalSourceTasksOutputSize = expectedTotalSourceTasksOutputSize.add(expectedSrcOutputSize); } } - LOG.info("Expected output: " + expectedTotalSourceTasksOutputSize + " based on actual output: " - + completedSourceTasksOutputSize + " from " + numVertexManagerEventsReceived + " vertex manager events. " - + " desiredTaskInputSize: " + desiredTaskInputDataSize + " max slow start tasks:" - + (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction) + " num sources completed:" - + numBipartiteSourceTasksCompleted); - - int desiredTaskParallelism = - (int)( - (expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/ - desiredTaskInputDataSize); - if(desiredTaskParallelism < minTaskParallelism) { - desiredTaskParallelism = minTaskParallelism; + LOG.info("Expected output: {} based on actual output: {} from {} vertex " + + "manager events. desiredTaskInputSize: {} max slow start tasks: {} " + + " num sources completed: {}", expectedTotalSourceTasksOutputSize, + completedSourceTasksOutputSize, numVertexManagerEventsReceived, + this.desiredTaskInputDataSize, + (totalNumBipartiteSourceTasks * this.slowStartMaxSrcCompletionFraction), + numBipartiteSourceTasksCompleted); + + // Calculate number of desired tasks by dividing with rounding up + BigInteger desiredTaskInputDataSize = BigInteger.valueOf(this.desiredTaskInputDataSize); + BigInteger desiredTaskInputDataSizeMinusOne = BigInteger.valueOf(this.desiredTaskInputDataSize - 1); + BigInteger bigDesiredTaskParallelism = + expectedTotalSourceTasksOutputSize.add(desiredTaskInputDataSizeMinusOne).divide(desiredTaskInputDataSize); + + if(bigDesiredTaskParallelism.compareTo(BigInteger.valueOf(Integer.MAX_VALUE)) > 0) { + LOG.info("Not reducing auto parallelism for vertex: {}" + + " since the desired parallelism of {} is greater than or equal" + + " to the max parallelism of {}", getContext().getVertexName(), + bigDesiredTaskParallelism, Integer.MAX_VALUE); + return true; + } + int desiredTaskParallelism = bigDesiredTaskParallelism.intValue(); + if(desiredTaskParallelism < this.minTaskParallelism) { + desiredTaskParallelism = this.minTaskParallelism; } if(desiredTaskParallelism >= currentParallelism) { http://git-wip-us.apache.org/repos/asf/tez/blob/f442c63f/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index df1f080..381ade3 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -460,6 +460,70 @@ public class TestShuffleVertexManager { // parallelism changed due to small data size scheduledTasks.clear(); + // Ensure long overflow doesn't reduce mistakenly + // Overflow can occur previously when output size * num tasks for a single vertex would over flow max long + // + manager = createManager(conf, mockContext, 1.0f, 1.0f, (long)(Long.MAX_VALUE / 1.5)); + manager.onVertexStarted(emptyCompletions); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); + Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled + Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); + // task completion from non-bipartite stage does nothing + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); + Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled + Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); + Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); + // First source 1 task completes + vmEvent = getVertexManagerEvent(null, 0L, mockSrcVertexId1); + manager.onVertexManagerEventReceived(vmEvent); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); + Assert.assertEquals(4, manager.pendingTasks.size()); + Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled + Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); + Assert.assertEquals(1, manager.numVertexManagerEventsReceived); + Assert.assertEquals(0L, manager.completedSourceTasksOutputSize); + // Second source 1 task completes + vmEvent = getVertexManagerEvent(null, 0L, mockSrcVertexId1); + manager.onVertexManagerEventReceived(vmEvent); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); + Assert.assertEquals(4, manager.pendingTasks.size()); + Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled + Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted); + Assert.assertEquals(0L, manager.completedSourceTasksOutputSize); + // First source 2 task completes + vmEvent = getVertexManagerEvent(null, Long.MAX_VALUE >> 1 , mockSrcVertexId2); + manager.onVertexManagerEventReceived(vmEvent); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); + Assert.assertEquals(4, manager.pendingTasks.size()); + Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled + Assert.assertEquals(3, manager.numBipartiteSourceTasksCompleted); + Assert.assertEquals(Long.MAX_VALUE >> 1, manager.completedSourceTasksOutputSize); + // Second source 2 task completes + vmEvent = getVertexManagerEvent(null, Long.MAX_VALUE >> 1 , mockSrcVertexId2); + manager.onVertexManagerEventReceived(vmEvent); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); + // Auto-reduce is triggered + verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); + verify(mockContext, times(3)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); + Assert.assertEquals(2, newEdgeManagers.size()); + Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled + Assert.assertEquals(2, scheduledTasks.size()); + Assert.assertTrue(scheduledTasks.contains(new Integer(0))); + Assert.assertTrue(scheduledTasks.contains(new Integer(1))); + Assert.assertEquals(4, manager.numBipartiteSourceTasksCompleted); + Assert.assertEquals(4, manager.numVertexManagerEventsReceived); + Assert.assertEquals(Long.MAX_VALUE >> 1 << 1, manager.completedSourceTasksOutputSize); + + //reset context for next test + when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); + when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); + when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); + + // parallelism changed due to small data size + scheduledTasks.clear(); + manager = createManager(conf, mockContext, 0.5f, 0.5f); manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); @@ -493,8 +557,8 @@ public class TestShuffleVertexManager { manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); // managedVertex tasks reduced - verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); - verify(mockContext, times(3)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); + verify(mockContext, times(6)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); + verify(mockContext, times(4)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); // TODO improve tests for parallelism Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled @@ -507,7 +571,7 @@ public class TestShuffleVertexManager { // more completions dont cause recalculation of parallelism manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); + verify(mockContext, times(6)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next(); @@ -1347,7 +1411,17 @@ public class TestShuffleVertexManager { } private ShuffleVertexManager createManager(Configuration conf, - VertexManagerPluginContext context, Float min, Float max) { + VertexManagerPluginContext context, Float min, Float max, Long size) { + return createShuffleVertexManager(conf, context, min, max, size); + } + + private ShuffleVertexManager createManager(Configuration conf, + VertexManagerPluginContext context, Float min, Float max) { + return createShuffleVertexManager(conf, context, min, max, null); + } + + private ShuffleVertexManager createShuffleVertexManager(Configuration conf, + VertexManagerPluginContext context, Float min, Float max, Long size) { if (min != null) { conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min); } else { @@ -1358,6 +1432,18 @@ public class TestShuffleVertexManager { } else { conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION); } + conf.setBoolean( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, + true); + if (size != null) { + conf.setLong( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, + size); + } else { + conf.setLong( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, + 1000L); + } UserPayload payload; try { payload = TezUtils.createUserPayloadFromConf(conf);
