Repository: tez Updated Branches: refs/heads/master ec9135145 -> 8247a643f
TEZ-3666. Integer overflow in ShuffleVertexManagerBase (Ming Ma via zhiyuany) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8247a643 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8247a643 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8247a643 Branch: refs/heads/master Commit: 8247a643f9fd62b270395ae255036706f5153d7c Parents: ec91351 Author: Zhiyuan Yang <[email protected]> Authored: Thu Oct 26 11:40:47 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Thu Oct 26 11:40:47 2017 -0700 ---------------------------------------------------------------------- .../vertexmanager/FairShuffleVertexManager.java | 9 +- .../vertexmanager/ShuffleVertexManagerBase.java | 22 +++-- .../TestFairShuffleVertexManager.java | 99 +++++++++++++------- .../TestShuffleVertexManagerBase.java | 4 +- .../TestShuffleVertexManagerUtils.java | 37 ++++++-- 5 files changed, 121 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8247a643/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java index a8b336c..f3971eb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java @@ -234,7 +234,9 @@ public class FairShuffleVertexManager extends ShuffleVertexManagerBase { } else { for (int i = 0; i < numOfPartitions; i++) { estimatedPartitionOutputSize[i] = - MB * getExpectedStatsAtIndex(i); + getExpectedStatsAtIndex(i); + LOG.info("Partition index {} with size {}", i, + estimatedPartitionOutputSize[i]); } } return estimatedPartitionOutputSize; @@ -419,9 +421,12 @@ public class FairShuffleVertexManager extends ShuffleVertexManagerBase { } Iterator<DestinationTaskInputsProperty> it = iterator(); while(it.hasNext()) { + DestinationTaskInputsProperty property = it.next(); sourceVertexInfo.getDestinationInputsProperties().put( - destinationIndex,it.next()); + destinationIndex, property); destinationIndex++; + LOG.info("Destination Index {}: Input Property {}", + destinationIndex, property); } startNextPartitionsGroup(); } http://git-wip-us.apache.org/repos/asf/tez/blob/8247a643/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java index 967d0ea..bb63bd5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java @@ -148,9 +148,14 @@ abstract class ShuffleVertexManagerBase extends VertexManagerPlugin { int getNumCompletedTasks() { return finishedTaskSet.cardinality(); } - int getExpectedStatsInMBAtIndex(int index) { + + BigInteger getExpectedStatsAtIndex(int index) { return (numVMEventsReceived == 0) ? - 0: statsInMB[index] * numTasks / numVMEventsReceived; + BigInteger.ZERO : + BigInteger.valueOf(statsInMB[index]). + multiply(BigInteger.valueOf(numTasks)). + divide(BigInteger.valueOf(numVMEventsReceived)). + multiply(BigInteger.valueOf(MB)); } } @@ -464,12 +469,17 @@ abstract class ShuffleVertexManagerBase extends VertexManagerPlugin { return stats; } - int getExpectedStatsAtIndex(int index) { - int stats = 0; + long getExpectedStatsAtIndex(int index) { + BigInteger stats = BigInteger.ZERO; for(SourceVertexInfo entry : getAllSourceVertexInfo()) { - stats += entry.getExpectedStatsInMBAtIndex(index); + stats = stats.add(entry.getExpectedStatsAtIndex(index)); + } + if (stats.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) { + LOG.warn("Partition {}'s size {} exceeded Long.MAX_VALUE", index, stats); + return Long.MAX_VALUE; + } else { + return stats.longValue(); } - return stats; } /** http://git-wip-us.apache.org/repos/asf/tez/blob/8247a643/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java index 61ca785..de857bc 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java @@ -112,17 +112,22 @@ public class TestFairShuffleVertexManager @Test(timeout = 5000) public void testReduceSchedulingWithPartitionStats() throws Exception { + final int numScatherAndGatherSourceTasks = 300; final Map<String, EdgeManagerPlugin> newEdgeManagers = new HashMap<String, EdgeManagerPlugin>(); - testSchedulingWithPartitionStats(FairRoutingType.REDUCE_PARALLELISM, - 2, 2, newEdgeManagers); + long[] partitionStats = new long[]{(MB), (2 * MB), (5 * MB)}; + testSchedulingWithPartitionStats( + FairRoutingType.REDUCE_PARALLELISM, numScatherAndGatherSourceTasks, + partitionStats, 2,2, 2, newEdgeManagers); EdgeManagerPluginOnDemand edgeManager = (EdgeManagerPluginOnDemand)newEdgeManagers.values().iterator().next(); // The first destination task fetches two partitions from all source tasks. - // 6 == 3 source tasks * 2 merged partitions - Assert.assertEquals(6, edgeManager.getNumDestinationTaskPhysicalInputs(0)); - for (int sourceTaskIndex = 0; sourceTaskIndex < 3; sourceTaskIndex++) { + // Thus the # of inputs == # of source tasks * 2 merged partitions + Assert.assertEquals(numScatherAndGatherSourceTasks * 2, + edgeManager.getNumDestinationTaskPhysicalInputs(0)); + for (int sourceTaskIndex = 0; + sourceTaskIndex < numScatherAndGatherSourceTasks; sourceTaskIndex++) { for (int j = 0; j < 2; j++) { if (j == 0) { EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata = @@ -144,19 +149,26 @@ public class TestFairShuffleVertexManager @Test(timeout = 5000) public void testFairSchedulingWithPartitionStats() throws Exception { + final int numScatherAndGatherSourceTasks = 300; final Map<String, EdgeManagerPlugin> newEdgeManagers = new HashMap<String, EdgeManagerPlugin>(); - testSchedulingWithPartitionStats(FairRoutingType.FAIR_PARALLELISM, - 3, 2, newEdgeManagers); + long[] partitionStats = new long[]{(MB), (2 * MB), (5 * MB)}; + + testSchedulingWithPartitionStats( + FairRoutingType.FAIR_PARALLELISM, + numScatherAndGatherSourceTasks, partitionStats, + 2, 3, 2, newEdgeManagers); // Get the first edgeManager which is SCATTER_GATHER. EdgeManagerPluginOnDemand edgeManager = (EdgeManagerPluginOnDemand)newEdgeManagers.values().iterator().next(); // The first destination task fetches two partitions from all source tasks. - // 6 == 3 source tasks * 2 merged partitions - Assert.assertEquals(6, edgeManager.getNumDestinationTaskPhysicalInputs(0)); - for (int sourceTaskIndex = 0; sourceTaskIndex < 3; sourceTaskIndex++) { + // Thus the # of inputs == # of source tasks * 2 merged partitions + Assert.assertEquals(numScatherAndGatherSourceTasks * 2, + edgeManager.getNumDestinationTaskPhysicalInputs(0)); + for (int sourceTaskIndex = 0; sourceTaskIndex < numScatherAndGatherSourceTasks; + sourceTaskIndex++) { for (int j = 0; j < 2; j++) { if (j == 0) { EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata = @@ -175,9 +187,10 @@ public class TestFairShuffleVertexManager } } - // The 2nd destination task fetches one partition from the first source - // task. - Assert.assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1)); + // The 2nd destination task fetches one partition from the first half of + // source tasks. + Assert.assertEquals(numScatherAndGatherSourceTasks / 2, + edgeManager.getNumDestinationTaskPhysicalInputs(1)); for (int j = 0; j < 2; j++) { if (j == 0) { EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata = @@ -193,33 +206,59 @@ public class TestFairShuffleVertexManager } } - // The 3rd destination task fetches one partition from the 2nd and 3rd - // source task. - Assert.assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(2)); - for (int sourceTaskIndex = 1; sourceTaskIndex < 3; sourceTaskIndex++) { + // The 3rd destination task fetches one partition from 2nd half of + // source tasks. + Assert.assertEquals(numScatherAndGatherSourceTasks / 2, + edgeManager.getNumDestinationTaskPhysicalInputs(2)); + for (int sourceTaskIndex = numScatherAndGatherSourceTasks / 2; + sourceTaskIndex < numScatherAndGatherSourceTasks; sourceTaskIndex++) { for (int j = 0; j < 2; j++) { if (j == 0) { EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata = edgeManager.routeCompositeDataMovementEventToDestination(sourceTaskIndex, 2); Assert.assertEquals(1, routeMetadata.getCount()); Assert.assertEquals(2, routeMetadata.getSource()); - Assert.assertEquals(sourceTaskIndex - 1, routeMetadata.getTarget()); + Assert.assertEquals( + sourceTaskIndex - numScatherAndGatherSourceTasks / 2, + routeMetadata.getTarget()); } else { EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata = edgeManager.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex, 2); Assert.assertEquals(1, routeMetadata.getNumEvents()); - Assert.assertEquals(sourceTaskIndex - 1, routeMetadata.getTargetIndices()[0]); + Assert.assertEquals(sourceTaskIndex - numScatherAndGatherSourceTasks / 2, + routeMetadata.getTargetIndices()[0]); } } } } + @Test(timeout = 500000) + public void testOverflow() throws Exception { + final int numScatherAndGatherSourceTasks = 30000; + final Map<String, EdgeManagerPlugin> newEdgeManagers = + new HashMap<String, EdgeManagerPlugin>(); + final int firstPartitionSize = 1; + final int secondPartitionSize = 2; + final int thirdPartitionSize = 500; + long[] partitionStats = new long[]{(firstPartitionSize * MB), + (secondPartitionSize * MB), (thirdPartitionSize * MB)}; + final int expectedDestinationTasks = + (firstPartitionSize + secondPartitionSize + thirdPartitionSize) + * numScatherAndGatherSourceTasks / 1000; + + testSchedulingWithPartitionStats( + FairRoutingType.FAIR_PARALLELISM, + numScatherAndGatherSourceTasks, partitionStats, 1000, + expectedDestinationTasks, 3, newEdgeManagers); + } + // Create a DAG with one destination vertexes connected to 3 source vertexes. // There are 3 tasks for each vertex. One edge is of type SCATTER_GATHER. // The other edges are BROADCAST. private void testSchedulingWithPartitionStats( - FairRoutingType fairRoutingType, int expectedScheduledTasks, - int expectedNumDestinationConsumerTasks, + FairRoutingType fairRoutingType, int numTasks, long[] partitionStats, + int numCompletedEvents, + int expectedScheduledTasks, int expectedNumDestinationConsumerTasks, Map<String, EdgeManagerPlugin> newEdgeManagers) throws Exception { Configuration conf = new Configuration(); @@ -227,7 +266,7 @@ public class TestFairShuffleVertexManager HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>(); String r1 = "R1"; - final int numOfTasksInr1 = 3; + final int numOfTasksInr1 = numTasks; EdgeProperty eProp1 = EdgeProperty.create( EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, @@ -291,20 +330,16 @@ public class TestFairShuffleVertexManager manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - //Send an event for r1. - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); Assert.assertTrue(manager.pendingTasks.size() == numOfTasksInDestination); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == numOfTasksInr1); - long[] sizes = new long[]{(50 * MB), (200 * MB), (500 * MB)}; - VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 800 * MB, - r1, true); - manager.onVertexManagerEventReceived(vmEvent); //send VM event - //stats from another task - sizes = new long[]{(60 * MB), (300 * MB), (600 * MB)}; - vmEvent = getVertexManagerEvent(sizes, 1200 * MB, r1, true); - manager.onVertexManagerEventReceived(vmEvent); //send VM event + for (int i = 0; i < numCompletedEvents; i++) { + VertexManagerEvent vmEvent = getVertexManagerEvent(partitionStats, 0, + r1, true); + manager.onSourceTaskCompleted(vmEvent.getProducerAttemptIdentifier()); + manager.onVertexManagerEventReceived(vmEvent); //send VM event + } //Send an event for m2. manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0)); http://git-wip-us.apache.org/repos/asf/tez/blob/8247a643/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java index 96f46d6..9c3a5b3 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java @@ -210,7 +210,7 @@ public class TestShuffleVertexManagerBase extends TestShuffleVertexManagerUtils //{5,9,12,18} in bitmap final long MB = 1024l * 1024l; long[] sizes = new long[]{(0l), (1 * MB), (964 * MB), (48 * MB)}; - VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex", false); + VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 0, "Vertex", false); manager = createManager(conf, mockContext, 0.01f, 0.75f); manager.onVertexStarted(emptyCompletions); @@ -239,7 +239,7 @@ public class TestShuffleVertexManagerBase extends TestShuffleVertexManagerUtils Assert.assertEquals(10, manager.getCurrentlyKnownStatsAtIndex(3)); //10 MB bucket // Testing for detailed partition stats - vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex", true); + vmEvent = getVertexManagerEvent(sizes, 0, "Vertex", true); manager = createManager(conf, mockContext, 0.01f, 0.75f); manager.onVertexStarted(emptyCompletions); http://git-wip-us.apache.org/repos/asf/tez/blob/8247a643/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java index 439d650..9281222 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java @@ -118,24 +118,35 @@ public class TestShuffleVertexManagerUtils { } VertexManagerEvent getVertexManagerEvent(long[] sizes, - long totalSize, String vertexName) throws IOException { - return getVertexManagerEvent(sizes, totalSize, vertexName, false); + long inputSize, String vertexName) throws IOException { + return getVertexManagerEvent(sizes, inputSize, vertexName, false); } - VertexManagerEvent getVertexManagerEvent(long[] sizes, - long totalSize, String vertexName, boolean reportDetailedStats) + VertexManagerEvent getVertexManagerEvent(long[] partitionSizes, + long uncompressedTotalSize, String vertexName, boolean reportDetailedStats) throws IOException { ByteBuffer payload; - if (sizes != null) { - RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes); + long totalSize = 0; + // Use partition sizes to compute the total size. + if (partitionSizes != null) { + totalSize = estimatedUncompressedSum(partitionSizes); + } else { + totalSize = uncompressedTotalSize; + } + if (partitionSizes != null) { + RoaringBitmap partitionStats = + ShuffleUtils.getPartitionStatsForPhysicalOutput(partitionSizes); DataOutputBuffer dout = new DataOutputBuffer(); partitionStats.serialize(dout); ByteString - partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData()); + partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString( + dout.getData()); if (reportDetailedStats) { payload = VertexManagerEventPayloadProto.newBuilder() .setOutputSize(totalSize) - .setDetailedPartitionStats(ShuffleUtils.getDetailedPartitionStatsForPhysicalOutput(sizes)) + .setDetailedPartitionStats( + ShuffleUtils.getDetailedPartitionStatsForPhysicalOutput( + partitionSizes)) .build().toByteString() .asReadOnlyByteBuffer(); } else { @@ -159,6 +170,16 @@ public class TestShuffleVertexManagerUtils { return vmEvent; } + // Assume 3 : 1 compression ratio to estimate the total size + // of all partitions. + long estimatedUncompressedSum(long[] partitionStats) { + long sum = 0; + for (long partition : partitionStats) { + sum += partition; + } + return sum * 3; + } + public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) { VertexIdentifier mockVertex = mock(VertexIdentifier.class); when(mockVertex.getName()).thenReturn(vName);
