Repository: hadoop Updated Branches: refs/heads/branch-2.7 d002e4d10 -> 5aa26aa66
MAPREDUCE-6724. Single shuffle to memory must not exceed Integer#MAX_VALUE. (Haibo Chen via gera) (cherry picked from commit 0a405c4f71b0697c8ecab6ce7de82a403baafad7) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5aa26aa6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5aa26aa6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5aa26aa6 Branch: refs/heads/branch-2.7 Commit: 5aa26aa66eacae7eeabcd0b22a544286d1857776 Parents: d002e4d Author: Jason Lowe <[email protected]> Authored: Fri Oct 28 14:40:28 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Fri Oct 28 14:43:40 2016 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 2 ++ .../mapreduce/task/reduce/MergeManagerImpl.java | 26 +++++++++++--------- .../mapreduce/task/reduce/TestMergeManager.java | 14 +++++++++++ 3 files changed, 31 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa26aa6/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 7a8ea72..3cee632 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -29,6 +29,8 @@ Release 2.7.4 - UNRELEASED MAPREDUCE-6763. Shuffle server listen queue is too small. (Jason Lowe via Varun Saxena) + MAPREDUCE-6724. Single shuffle to memory must not exceed + Integer#MAX_VALUE. (Haibo Chen via jlowe) Release 2.7.3 - 2016-08-25 http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa26aa6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index 3699ddd..787d65a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -99,7 +99,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> { private long usedMemory; private long commitMemory; - private final long maxSingleShuffleLimit; + + @VisibleForTesting + final long maxSingleShuffleLimit; private final int memToMemMergeOutputsThreshold; private final long mergeThreshold; @@ -187,10 +189,16 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> { usedMemory = 0L; commitMemory = 0L; - this.maxSingleShuffleLimit = - (long)(memoryLimit * singleShuffleMemoryLimitPercent); - this.memToMemMergeOutputsThreshold = - jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor); + long maxSingleShuffleLimitConfiged = + (long)(memoryLimit * singleShuffleMemoryLimitPercent); + if(maxSingleShuffleLimitConfiged > Integer.MAX_VALUE) { + maxSingleShuffleLimitConfiged = Integer.MAX_VALUE; + LOG.info("The max number of bytes for a single in-memory shuffle cannot" + + " be larger than Integer.MAX_VALUE. Setting it to Integer.MAX_VALUE"); + } + this.maxSingleShuffleLimit = maxSingleShuffleLimitConfiged; + this.memToMemMergeOutputsThreshold = + jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor); this.mergeThreshold = (long)(this.memoryLimit * jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 0.90f)); @@ -248,17 +256,13 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> { public void waitForResource() throws InterruptedException { inMemoryMerger.waitForMerge(); } - - private boolean canShuffleToMemory(long requestedSize) { - return (requestedSize < maxSingleShuffleLimit); - } - + @Override public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, long requestedSize, int fetcher ) throws IOException { - if (!canShuffleToMemory(requestedSize)) { + if (requestedSize > maxSingleShuffleLimit) { LOG.info(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + maxSingleShuffleLimit + ")"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa26aa6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java index ef860af..f164b92 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java @@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MROutputFiles; import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; import org.junit.Assert; import org.junit.Test; @@ -288,5 +289,18 @@ public class TestMergeManager { final long maxInMemReduce = mgr.getMaxInMemReduceLimit(); assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce, maxInMemReduce > Integer.MAX_VALUE); + assertEquals("maxSingleShuffleLimit to be capped at Integer.MAX_VALUE", + Integer.MAX_VALUE, mgr.maxSingleShuffleLimit); + verifyReservedMapOutputType(mgr, 10L, "MEMORY"); + verifyReservedMapOutputType(mgr, 1L + Integer.MAX_VALUE, "DISK"); + } + + private void verifyReservedMapOutputType(MergeManagerImpl<Text, Text> mgr, + long size, String expectedShuffleMode) throws IOException { + final TaskAttemptID mapId = TaskAttemptID.forName("attempt_0_1_m_1_1"); + final MapOutput<Text, Text> mapOutput = mgr.reserve(mapId, size, 1); + assertEquals("Shuffled bytes: " + size, expectedShuffleMode, + mapOutput.getDescription()); + mgr.unreserve(size); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
