Repository: tez Updated Branches: refs/heads/master 7e895f54b -> 09679bdfd
TEZ-3831. Reduce Unordered memory needed for storing empty completed events (Jonathan Eagles via kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/09679bdf Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/09679bdf Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/09679bdf Branch: refs/heads/master Commit: 09679bdfd8e78a623c15026479b6fce71f48a4df Parents: 7e895f5 Author: Kuhu Shukla <[email protected]> Authored: Wed Sep 13 17:24:59 2017 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Wed Sep 13 17:24:59 2017 -0500 ---------------------------------------------------------------------- .../common/shuffle/impl/ShuffleManager.java | 41 ++++++-------------- 1 file changed, 12 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/09679bdf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index e142228..8e9be12 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -702,7 +702,9 @@ public class ShuffleManager implements FetcherCallback { private void maybeInformInputReady(FetchedInput fetchedInput) { lock.lock(); try { - completedInputs.add(fetchedInput); + if (!(fetchedInput instanceof NullFetchedInput)) { + completedInputs.add(fetchedInput); + } if (!inputReadyNotificationSent.getAndSet(true)) { // TODO Should eventually be controlled by Inputs which are processing the data. inputContext.inputIsReady(); @@ -846,20 +848,6 @@ public class ShuffleManager implements FetcherCallback { } } - /////////////////// Methods for walking the available inputs - - /** - * @return true if there is another input ready for consumption. - */ - public boolean newInputAvailable() { - FetchedInput head = completedInputs.peek(); - if (head == null || head instanceof NullFetchedInput) { - return false; - } else { - return true; - } - } - /** * @return true if all of the required inputs have been fetched. */ @@ -878,21 +866,16 @@ public class ShuffleManager implements FetcherCallback { * but more may become available. */ public FetchedInput getNextInput() throws InterruptedException { - FetchedInput input = null; - do { - // Check for no additional inputs - lock.lock(); - try { - input = completedInputs.peek(); - if (input == null && allInputsFetched()) { - break; - } - } finally { - lock.unlock(); + // Check for no additional inputs + lock.lock(); + try { + if (completedInputs.peek() == null && allInputsFetched()) { + return null; } - input = completedInputs.take(); // block - } while (input instanceof NullFetchedInput); - return input; + } finally { + lock.unlock(); + } + return completedInputs.take(); // block } public int getNumInputs() {
