Repository: tez Updated Branches: refs/heads/master 41ac55155 -> 12695f3d0
TEZ-1911. addendum patch. MergeManager's unconditionalReserve() should check for memory limits before allocating memory to IntermediateMemoryToMemoryMerger (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/12695f3d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/12695f3d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/12695f3d Branch: refs/heads/master Commit: 12695f3d030b287d10d221fe28d69b6fa5bbc09a Parents: 41ac551 Author: Rajesh Balamohan <[email protected]> Authored: Sat Feb 27 07:03:18 2016 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Sat Feb 27 07:03:18 2016 +0530 ---------------------------------------------------------------------- .../library/common/shuffle/orderedgrouped/MergeManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/12695f3d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index b01609c..6c60a80 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -656,13 +656,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { Iterator<MapOutput> it = inputs.iterator(); while(it.hasNext() && !Thread.currentThread().isInterrupted()) { MapOutput mo = it.next(); - if ((mergeOutputSize + mo.getSize() + usedMemory) > memoryLimit) { + if ((mergeOutputSize + mo.getSize() + manager.getUsedMemory()) > memoryLimit) { //Search for smaller segments that can fit into existing mem if (LOG.isDebugEnabled()) { LOG.debug("Size is greater than usedMemory. " + "mergeOutputSize=" + mergeOutputSize + ", moSize=" + mo.getSize() - + ", usedMemory=" + usedMemory + + ", usedMemory=" + manager.getUsedMemory() + ", memoryLimit=" + memoryLimit); } continue; @@ -1209,7 +1209,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { } @VisibleForTesting - long getUsedMemory() { + synchronized long getUsedMemory() { return usedMemory; }
