Repository: tez
Updated Branches:
  refs/heads/master ec9135145 -> 8247a643f


TEZ-3666. Integer overflow in ShuffleVertexManagerBase (Ming Ma via zhiyuany)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8247a643
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8247a643
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8247a643

Branch: refs/heads/master
Commit: 8247a643f9fd62b270395ae255036706f5153d7c
Parents: ec91351
Author: Zhiyuan Yang <[email protected]>
Authored: Thu Oct 26 11:40:47 2017 -0700
Committer: Zhiyuan Yang <[email protected]>
Committed: Thu Oct 26 11:40:47 2017 -0700

----------------------------------------------------------------------
 .../vertexmanager/FairShuffleVertexManager.java |  9 +-
 .../vertexmanager/ShuffleVertexManagerBase.java | 22 +++--
 .../TestFairShuffleVertexManager.java           | 99 +++++++++++++-------
 .../TestShuffleVertexManagerBase.java           |  4 +-
 .../TestShuffleVertexManagerUtils.java          | 37 ++++++--
 5 files changed, 121 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8247a643/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java
index a8b336c..f3971eb 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java
@@ -234,7 +234,9 @@ public class FairShuffleVertexManager extends 
ShuffleVertexManagerBase {
     } else {
       for (int i = 0; i < numOfPartitions; i++) {
         estimatedPartitionOutputSize[i] =
-            MB * getExpectedStatsAtIndex(i);
+            getExpectedStatsAtIndex(i);
+        LOG.info("Partition index {} with size {}", i,
+            estimatedPartitionOutputSize[i]);
       }
     }
     return estimatedPartitionOutputSize;
@@ -419,9 +421,12 @@ public class FairShuffleVertexManager extends 
ShuffleVertexManagerBase {
         }
         Iterator<DestinationTaskInputsProperty> it = iterator();
         while(it.hasNext()) {
+          DestinationTaskInputsProperty property = it.next();
           sourceVertexInfo.getDestinationInputsProperties().put(
-              destinationIndex,it.next());
+              destinationIndex, property);
           destinationIndex++;
+          LOG.info("Destination Index {}: Input Property {}",
+              destinationIndex, property);
         }
         startNextPartitionsGroup();
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/8247a643/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
index 967d0ea..bb63bd5 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
@@ -148,9 +148,14 @@ abstract class ShuffleVertexManagerBase extends 
VertexManagerPlugin {
     int getNumCompletedTasks() {
       return finishedTaskSet.cardinality();
     }
-    int getExpectedStatsInMBAtIndex(int index) {
+
+    BigInteger getExpectedStatsAtIndex(int index) {
       return (numVMEventsReceived == 0) ?
-          0: statsInMB[index] * numTasks / numVMEventsReceived;
+         BigInteger.ZERO :
+         BigInteger.valueOf(statsInMB[index]).
+           multiply(BigInteger.valueOf(numTasks)).
+           divide(BigInteger.valueOf(numVMEventsReceived)).
+           multiply(BigInteger.valueOf(MB));
     }
   }
 
@@ -464,12 +469,17 @@ abstract class ShuffleVertexManagerBase extends 
VertexManagerPlugin {
     return stats;
   }
 
-  int getExpectedStatsAtIndex(int index) {
-    int stats = 0;
+  long getExpectedStatsAtIndex(int index) {
+    BigInteger stats = BigInteger.ZERO;
     for(SourceVertexInfo entry : getAllSourceVertexInfo()) {
-      stats += entry.getExpectedStatsInMBAtIndex(index);
+      stats = stats.add(entry.getExpectedStatsAtIndex(index));
+    }
+    if (stats.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
+      LOG.warn("Partition {}'s size {} exceeded Long.MAX_VALUE", index, stats);
+      return Long.MAX_VALUE;
+    } else {
+      return stats.longValue();
     }
-    return stats;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/8247a643/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java
index 61ca785..de857bc 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java
@@ -112,17 +112,22 @@ public class TestFairShuffleVertexManager
 
   @Test(timeout = 5000)
   public void testReduceSchedulingWithPartitionStats() throws Exception {
+    final int numScatherAndGatherSourceTasks = 300;
     final Map<String, EdgeManagerPlugin> newEdgeManagers =
         new HashMap<String, EdgeManagerPlugin>();
-    testSchedulingWithPartitionStats(FairRoutingType.REDUCE_PARALLELISM,
-        2, 2, newEdgeManagers);
+    long[] partitionStats = new long[]{(MB), (2 * MB), (5 * MB)};
+    testSchedulingWithPartitionStats(
+        FairRoutingType.REDUCE_PARALLELISM, numScatherAndGatherSourceTasks,
+        partitionStats, 2,2, 2, newEdgeManagers);
     EdgeManagerPluginOnDemand edgeManager =
         (EdgeManagerPluginOnDemand)newEdgeManagers.values().iterator().next();
 
     // The first destination task fetches two partitions from all source tasks.
-    // 6 == 3 source tasks * 2 merged partitions
-    Assert.assertEquals(6, edgeManager.getNumDestinationTaskPhysicalInputs(0));
-    for (int sourceTaskIndex = 0; sourceTaskIndex < 3; sourceTaskIndex++) {
+    // Thus the # of inputs == # of source tasks * 2 merged partitions
+    Assert.assertEquals(numScatherAndGatherSourceTasks * 2,
+        edgeManager.getNumDestinationTaskPhysicalInputs(0));
+    for (int sourceTaskIndex = 0;
+        sourceTaskIndex < numScatherAndGatherSourceTasks; sourceTaskIndex++) {
       for (int j = 0; j < 2; j++) {
         if (j == 0) {
           EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata =
@@ -144,19 +149,26 @@ public class TestFairShuffleVertexManager
 
   @Test(timeout = 5000)
   public void testFairSchedulingWithPartitionStats() throws Exception {
+    final int numScatherAndGatherSourceTasks = 300;
     final Map<String, EdgeManagerPlugin> newEdgeManagers =
         new HashMap<String, EdgeManagerPlugin>();
-    testSchedulingWithPartitionStats(FairRoutingType.FAIR_PARALLELISM,
-        3, 2, newEdgeManagers);
+    long[] partitionStats = new long[]{(MB), (2 * MB), (5 * MB)};
+
+    testSchedulingWithPartitionStats(
+        FairRoutingType.FAIR_PARALLELISM,
+        numScatherAndGatherSourceTasks, partitionStats,
+        2, 3, 2, newEdgeManagers);
 
     // Get the first edgeManager which is SCATTER_GATHER.
     EdgeManagerPluginOnDemand edgeManager =
         (EdgeManagerPluginOnDemand)newEdgeManagers.values().iterator().next();
 
     // The first destination task fetches two partitions from all source tasks.
-    // 6 == 3 source tasks * 2 merged partitions
-    Assert.assertEquals(6, edgeManager.getNumDestinationTaskPhysicalInputs(0));
-    for (int sourceTaskIndex = 0; sourceTaskIndex < 3; sourceTaskIndex++) {
+    // Thus the # of inputs == # of source tasks * 2 merged partitions
+    Assert.assertEquals(numScatherAndGatherSourceTasks * 2,
+        edgeManager.getNumDestinationTaskPhysicalInputs(0));
+    for (int sourceTaskIndex = 0; sourceTaskIndex < 
numScatherAndGatherSourceTasks;
+        sourceTaskIndex++) {
       for (int j = 0; j < 2; j++) {
         if (j == 0) {
           EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata =
@@ -175,9 +187,10 @@ public class TestFairShuffleVertexManager
       }
     }
 
-    // The 2nd destination task fetches one partition from the first source
-    // task.
-    Assert.assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+    // The 2nd destination task fetches one partition from the first half of
+    // source tasks.
+    Assert.assertEquals(numScatherAndGatherSourceTasks / 2,
+        edgeManager.getNumDestinationTaskPhysicalInputs(1));
     for (int j = 0; j < 2; j++) {
       if (j == 0) {
         EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata =
@@ -193,33 +206,59 @@ public class TestFairShuffleVertexManager
       }
     }
 
-    // The 3rd destination task fetches one partition from the 2nd and 3rd
-    // source task.
-    Assert.assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(2));
-    for (int sourceTaskIndex = 1; sourceTaskIndex < 3; sourceTaskIndex++) {
+    // The 3rd destination task fetches one partition from 2nd half of
+    // source tasks.
+    Assert.assertEquals(numScatherAndGatherSourceTasks / 2,
+        edgeManager.getNumDestinationTaskPhysicalInputs(2));
+    for (int sourceTaskIndex = numScatherAndGatherSourceTasks / 2;
+        sourceTaskIndex < numScatherAndGatherSourceTasks; sourceTaskIndex++) {
       for (int j = 0; j < 2; j++) {
         if (j == 0) {
           EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata =
               
edgeManager.routeCompositeDataMovementEventToDestination(sourceTaskIndex, 2);
           Assert.assertEquals(1, routeMetadata.getCount());
           Assert.assertEquals(2, routeMetadata.getSource());
-          Assert.assertEquals(sourceTaskIndex - 1, routeMetadata.getTarget());
+          Assert.assertEquals(
+              sourceTaskIndex - numScatherAndGatherSourceTasks / 2,
+              routeMetadata.getTarget());
         } else {
           EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata =
               
edgeManager.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex, 2);
           Assert.assertEquals(1, routeMetadata.getNumEvents());
-          Assert.assertEquals(sourceTaskIndex - 1, 
routeMetadata.getTargetIndices()[0]);
+          Assert.assertEquals(sourceTaskIndex - numScatherAndGatherSourceTasks 
/ 2,
+              routeMetadata.getTargetIndices()[0]);
         }
       }
     }
   }
 
+  @Test(timeout = 500000)
+  public void testOverflow() throws Exception {
+    final int numScatherAndGatherSourceTasks = 30000;
+    final Map<String, EdgeManagerPlugin> newEdgeManagers =
+            new HashMap<String, EdgeManagerPlugin>();
+    final int firstPartitionSize = 1;
+    final int secondPartitionSize = 2;
+    final int thirdPartitionSize = 500;
+    long[] partitionStats = new long[]{(firstPartitionSize * MB),
+        (secondPartitionSize * MB), (thirdPartitionSize * MB)};
+    final int expectedDestinationTasks =
+        (firstPartitionSize + secondPartitionSize + thirdPartitionSize)
+           * numScatherAndGatherSourceTasks / 1000;
+
+    testSchedulingWithPartitionStats(
+        FairRoutingType.FAIR_PARALLELISM,
+        numScatherAndGatherSourceTasks, partitionStats, 1000,
+        expectedDestinationTasks, 3, newEdgeManagers);
+  }
+
   // Create a DAG with one destination vertexes connected to 3 source vertexes.
   // There are 3 tasks for each vertex. One edge is of type SCATTER_GATHER.
   // The other edges are BROADCAST.
   private void testSchedulingWithPartitionStats(
-      FairRoutingType fairRoutingType, int expectedScheduledTasks,
-      int expectedNumDestinationConsumerTasks,
+      FairRoutingType fairRoutingType, int numTasks, long[] partitionStats,
+      int numCompletedEvents,
+      int expectedScheduledTasks, int expectedNumDestinationConsumerTasks,
       Map<String, EdgeManagerPlugin> newEdgeManagers)
       throws Exception {
     Configuration conf = new Configuration();
@@ -227,7 +266,7 @@ public class TestFairShuffleVertexManager
 
     HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, 
EdgeProperty>();
     String r1 = "R1";
-    final int numOfTasksInr1 = 3;
+    final int numOfTasksInr1 = numTasks;
     EdgeProperty eProp1 = EdgeProperty.create(
         EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED,
@@ -291,20 +330,16 @@ public class TestFairShuffleVertexManager
         manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
-    //Send an event for r1.
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
     Assert.assertTrue(manager.pendingTasks.size() == numOfTasksInDestination); 
// no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == numOfTasksInr1);
 
-    long[] sizes = new long[]{(50 * MB), (200 * MB), (500 * MB)};
-    VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 800 * MB,
-        r1, true);
-    manager.onVertexManagerEventReceived(vmEvent); //send VM event
 
-    //stats from another task
-    sizes = new long[]{(60 * MB), (300 * MB), (600 * MB)};
-    vmEvent = getVertexManagerEvent(sizes, 1200 * MB, r1, true);
-    manager.onVertexManagerEventReceived(vmEvent); //send VM event
+    for (int i = 0; i < numCompletedEvents; i++) {
+      VertexManagerEvent vmEvent = getVertexManagerEvent(partitionStats, 0,
+          r1, true);
+      manager.onSourceTaskCompleted(vmEvent.getProducerAttemptIdentifier());
+      manager.onVertexManagerEventReceived(vmEvent); //send VM event
+    }
 
     //Send an event for m2.
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));

http://git-wip-us.apache.org/repos/asf/tez/blob/8247a643/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java
index 96f46d6..9c3a5b3 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java
@@ -210,7 +210,7 @@ public class TestShuffleVertexManagerBase extends 
TestShuffleVertexManagerUtils
     //{5,9,12,18} in bitmap
     final long MB = 1024l * 1024l;
     long[] sizes = new long[]{(0l), (1 * MB), (964 * MB), (48 * MB)};
-    VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex", 
false);
+    VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 0, "Vertex", 
false);
 
     manager = createManager(conf, mockContext, 0.01f, 0.75f);
     manager.onVertexStarted(emptyCompletions);
@@ -239,7 +239,7 @@ public class TestShuffleVertexManagerBase extends 
TestShuffleVertexManagerUtils
     Assert.assertEquals(10, manager.getCurrentlyKnownStatsAtIndex(3)); //10 MB 
bucket
 
     // Testing for detailed partition stats
-    vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex", true);
+    vmEvent = getVertexManagerEvent(sizes, 0, "Vertex", true);
 
     manager = createManager(conf, mockContext, 0.01f, 0.75f);
     manager.onVertexStarted(emptyCompletions);

http://git-wip-us.apache.org/repos/asf/tez/blob/8247a643/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
index 439d650..9281222 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
@@ -118,24 +118,35 @@ public class TestShuffleVertexManagerUtils {
   }
 
   VertexManagerEvent getVertexManagerEvent(long[] sizes,
-      long totalSize, String vertexName) throws IOException {
-    return getVertexManagerEvent(sizes, totalSize, vertexName, false);
+      long inputSize, String vertexName) throws IOException {
+    return getVertexManagerEvent(sizes, inputSize, vertexName, false);
   }
 
-  VertexManagerEvent getVertexManagerEvent(long[] sizes,
-      long totalSize, String vertexName, boolean reportDetailedStats)
+  VertexManagerEvent getVertexManagerEvent(long[] partitionSizes,
+      long uncompressedTotalSize, String vertexName, boolean 
reportDetailedStats)
       throws IOException {
     ByteBuffer payload;
-    if (sizes != null) {
-      RoaringBitmap partitionStats = 
ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes);
+    long totalSize = 0;
+    // Use partition sizes to compute the total size.
+    if (partitionSizes != null) {
+      totalSize = estimatedUncompressedSum(partitionSizes);
+    } else {
+      totalSize = uncompressedTotalSize;
+    }
+    if (partitionSizes != null) {
+      RoaringBitmap partitionStats =
+          ShuffleUtils.getPartitionStatsForPhysicalOutput(partitionSizes);
       DataOutputBuffer dout = new DataOutputBuffer();
       partitionStats.serialize(dout);
       ByteString
-          partitionStatsBytes = 
TezCommonUtils.compressByteArrayToByteString(dout.getData());
+          partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(
+              dout.getData());
       if (reportDetailedStats) {
         payload = VertexManagerEventPayloadProto.newBuilder()
             .setOutputSize(totalSize)
-            
.setDetailedPartitionStats(ShuffleUtils.getDetailedPartitionStatsForPhysicalOutput(sizes))
+            .setDetailedPartitionStats(
+                ShuffleUtils.getDetailedPartitionStatsForPhysicalOutput(
+                    partitionSizes))
             .build().toByteString()
             .asReadOnlyByteBuffer();
       } else {
@@ -159,6 +170,16 @@ public class TestShuffleVertexManagerUtils {
     return vmEvent;
   }
 
+  // Assume 3 : 1 compression ratio to estimate the total size
+  // of all partitions.
+  long estimatedUncompressedSum(long[] partitionStats) {
+    long sum = 0;
+    for (long partition : partitionStats) {
+      sum += partition;
+    }
+    return sum * 3;
+  }
+
   public static TaskAttemptIdentifier createTaskAttemptIdentifier(String 
vName, int tId) {
     VertexIdentifier mockVertex = mock(VertexIdentifier.class);
     when(mockVertex.getName()).thenReturn(vName);

Reply via email to