Repository: hadoop
Updated Branches:
  refs/heads/trunk c4463f2ef -> 6890d5b47


MAPREDUCE-6724. Single shuffle to memory must not exceed Integer#MAX_VALUE. 
(Haibo Chen via gera)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6890d5b4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6890d5b4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6890d5b4

Branch: refs/heads/trunk
Commit: 6890d5b472320fa7592ed1b08b623c55a27089c6
Parents: c4463f2
Author: Gera Shegalov <g...@apache.org>
Authored: Thu Jul 28 14:37:03 2016 -0700
Committer: Gera Shegalov <g...@apache.org>
Committed: Mon Aug 1 23:35:47 2016 -0700

----------------------------------------------------------------------
 .../mapreduce/task/reduce/MergeManagerImpl.java | 26 +++++++++++---------
 .../mapreduce/task/reduce/TestMergeManager.java | 23 +++++++++++------
 2 files changed, 30 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6890d5b4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
index 1673ff8..09fe0cb 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -99,7 +99,9 @@ public class MergeManagerImpl<K, V> implements 
MergeManager<K, V> {
 
   private long usedMemory;
   private long commitMemory;
-  private final long maxSingleShuffleLimit;
+
+  @VisibleForTesting
+  final long maxSingleShuffleLimit;
   
   private final int memToMemMergeOutputsThreshold; 
   private final long mergeThreshold;
@@ -187,10 +189,16 @@ public class MergeManagerImpl<K, V> implements 
MergeManager<K, V> {
 
     usedMemory = 0L;
     commitMemory = 0L;
-    this.maxSingleShuffleLimit = 
-      (long)(memoryLimit * singleShuffleMemoryLimitPercent);
-    this.memToMemMergeOutputsThreshold = 
-            jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, 
ioSortFactor);
+    long maxSingleShuffleLimitConfiged =
+        (long)(memoryLimit * singleShuffleMemoryLimitPercent);
+    if(maxSingleShuffleLimitConfiged > Integer.MAX_VALUE) {
+      maxSingleShuffleLimitConfiged = Integer.MAX_VALUE;
+      LOG.info("The max number of bytes for a single in-memory shuffle cannot" 
+
+          " be larger than Integer.MAX_VALUE. Setting it to 
Integer.MAX_VALUE");
+    }
+    this.maxSingleShuffleLimit = maxSingleShuffleLimitConfiged;
+    this.memToMemMergeOutputsThreshold =
+        jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
     this.mergeThreshold = (long)(this.memoryLimit * 
                           jobConf.getFloat(
                             MRJobConfig.SHUFFLE_MERGE_PERCENT,
@@ -249,17 +257,13 @@ public class MergeManagerImpl<K, V> implements 
MergeManager<K, V> {
   public void waitForResource() throws InterruptedException {
     inMemoryMerger.waitForMerge();
   }
-  
-  private boolean canShuffleToMemory(long requestedSize) {
-    return (requestedSize < maxSingleShuffleLimit); 
-  }
-  
+
   @Override
   public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, 
                                              long requestedSize,
                                              int fetcher
                                              ) throws IOException {
-    if (!canShuffleToMemory(requestedSize)) {
+    if (requestedSize > maxSingleShuffleLimit) {
       LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
                " is greater than maxSingleShuffleLimit (" + 
                maxSingleShuffleLimit + ")");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6890d5b4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
index 1c0d25b..325d2f9 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
@@ -289,22 +289,29 @@ public class TestMergeManager {
     final long maxInMemReduce = mgr.getMaxInMemReduceLimit();
     assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce,
         maxInMemReduce > Integer.MAX_VALUE);
+    assertEquals("maxSingleShuffleLimit to be capped at Integer.MAX_VALUE",
+        Integer.MAX_VALUE, mgr.maxSingleShuffleLimit);
+    verifyReservedMapOutputType(mgr, 10L, "MEMORY");
+    verifyReservedMapOutputType(mgr, 1L + Integer.MAX_VALUE, "DISK");
+  }
+
+  private void verifyReservedMapOutputType(MergeManagerImpl<Text, Text> mgr,
+      long size, String expectedShuffleMode) throws IOException {
+    final TaskAttemptID mapId = TaskAttemptID.forName("attempt_0_1_m_1_1");
+    final MapOutput<Text, Text> mapOutput = mgr.reserve(mapId, size, 1);
+    assertEquals("Shuffled bytes: " + size, expectedShuffleMode,
+        mapOutput.getDescription());
+    mgr.unreserve(size);
   }
 
   @Test
   public void testZeroShuffleMemoryLimitPercent() throws Exception {
     final JobConf jobConf = new JobConf();
     jobConf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f);
-    final MergeManager<Text, Text> mgr =
+    final MergeManagerImpl<Text, Text> mgr =
         new MergeManagerImpl<>(null, jobConf, mock(LocalFileSystem.class),
             null, null, null, null, null, null, null, null, null, null,
             new MROutputFiles());
-    final long mapOutputSize = 10;
-    final int fetcher = 1;
-    final MapOutput<Text, Text> mapOutput = mgr.reserve(
-        TaskAttemptID.forName("attempt_0_1_m_1_1"),
-        mapOutputSize, fetcher);
-    assertEquals("Tiny map outputs should be shuffled to disk", "DISK",
-        mapOutput.getDescription());
+    verifyReservedMapOutputType(mgr, 10L, "DISK");
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to