TEZ-2214. FetcherOrderedGrouped can get stuck indefinitely when MergeManager misses memToDiskMerging (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2fe2d635 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2fe2d635 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2fe2d635 Branch: refs/heads/TEZ-2003 Commit: 2fe2d63529b3fb420c15d4be6bbf50d501edb626 Parents: d1b4bd4 Author: Rajesh Balamohan <[email protected]> Authored: Thu Mar 26 04:12:08 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Mar 26 04:12:08 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../shuffle/orderedgrouped/MergeManager.java | 25 ++++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2fe2d635/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f17c7dc..9db71a8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -251,6 +251,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2214. FetcherOrderedGrouped can get stuck indefinitely when MergeManager misses memToDiskMerging TEZ-1923. FetcherOrderedGrouped gets into infinite loop due to memory pressure TEZ-2219. Should verify the input_name/output_name to be unique per vertex TEZ-2186. OOM with a simple scatter gather job with re-use http://git-wip-us.apache.org/repos/asf/tez/blob/2fe2d635/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 045b91d..d5f7be1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -325,8 +325,29 @@ public class MergeManager { public void waitForInMemoryMerge() throws InterruptedException { inMemoryMerger.waitForMerge(); + + /** + * Memory released during merge process could have been used by active fetchers and if they + * are too fast, 'commitMemory & usedMemory' could have grown beyond allowed threshold. Since + * merge was already in progress, this would not have kicked off another merge and fetchers + * could get into indefinite wait state later. To address this, trigger another merge process + * if needed and wait for it to complete (to release committedMemory & usedMemory). + */ + boolean triggerAdditionalMerge = false; + synchronized (this) { + if (commitMemory >= mergeThreshold) { + startMemToDiskMerge(); + triggerAdditionalMerge = true; + } + } + if (triggerAdditionalMerge) { + inMemoryMerger.waitForMerge(); + if (LOG.isDebugEnabled()) { + LOG.debug("Additional in-memory merge triggered"); + } + } } - + private boolean canShuffleToMemory(long requestedSize) { return (requestedSize < maxSingleShuffleLimit); } @@ -578,7 +599,7 @@ public class MergeManager { if (inputs == null || inputs.size() == 0) { return; } - + numMemToDiskMerges.increment(1); //name this output file same as the name of the first file that is
