TEZ-1911. addendum patch. MergeManager's unconditionalReserve() should check for memory limits before allocating memory to IntermediateMemoryToMemoryMerger (rbalamohan) (cherry picked from commit 12695f3d030b287d10d221fe28d69b6fa5bbc09a)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ef61eb28 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ef61eb28 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ef61eb28 Branch: refs/heads/branch-0.7 Commit: ef61eb28daa517ec3f870e15581f5b72a6733352 Parents: 2238443 Author: Rajesh Balamohan <[email protected]> Authored: Sat Feb 27 07:03:18 2016 +0530 Committer: Jason Lowe <[email protected]> Committed: Mon Feb 29 15:57:02 2016 +0000 ---------------------------------------------------------------------- .../library/common/shuffle/orderedgrouped/MergeManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ef61eb28/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 4a31d06..b716c01 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -598,13 +598,13 @@ public class MergeManager { Iterator<MapOutput> it = inputs.iterator(); while(it.hasNext() && !Thread.currentThread().isInterrupted()) { MapOutput mo = it.next(); - if ((mergeOutputSize + mo.getSize() + usedMemory) > memoryLimit) { + if ((mergeOutputSize + mo.getSize() + manager.getUsedMemory()) > memoryLimit) { //Search for smaller segments that can fit into existing mem if (LOG.isDebugEnabled()) { LOG.debug("Size is greater than usedMemory. " + "mergeOutputSize=" + mergeOutputSize + ", moSize=" + mo.getSize() - + ", usedMemory=" + usedMemory + + ", usedMemory=" + manager.getUsedMemory() + ", memoryLimit=" + memoryLimit); } continue; @@ -1105,7 +1105,7 @@ public class MergeManager { } @VisibleForTesting - long getUsedMemory() { + synchronized long getUsedMemory() { return usedMemory; }
