Repository: tez Updated Branches: refs/heads/master 0033da85d -> 4c2963588
TEZ-3831 addendum. Reduce Unordered memory needed for storing empty completed events (Jonathan Eagles via kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4c296358 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4c296358 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4c296358 Branch: refs/heads/master Commit: 4c29635880446c7b13ca6318f5429d2887538d45 Parents: 0033da8 Author: Kuhu Shukla <[email protected]> Authored: Fri Sep 15 14:41:41 2017 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Fri Sep 15 14:41:41 2017 -0500 ---------------------------------------------------------------------- .../library/common/shuffle/impl/ShuffleManager.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4c296358/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 8e9be12..0a0286e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -721,6 +721,10 @@ public class ShuffleManager implements FetcherCallback { int numComplete = numCompletedInputs.incrementAndGet(); if (numComplete == numInputs) { + // Poison pill End of Input message to awake blocking take call + if (fetchedInput instanceof NullFetchedInput) { + completedInputs.add(fetchedInput); + } LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName()); } } finally { @@ -875,7 +879,12 @@ public class ShuffleManager implements FetcherCallback { } finally { lock.unlock(); } - return completedInputs.take(); // block + // Block until next input or End of Input message + FetchedInput fetchedInput = completedInputs.take(); + if (fetchedInput instanceof NullFetchedInput) { + fetchedInput = null; + } + return fetchedInput; } public int getNumInputs() {
