Repository: tez
Updated Branches:
  refs/heads/master 0033da85d -> 4c2963588


TEZ-3831 addendum. Reduce Unordered memory needed for storing empty completed 
events (Jonathan Eagles via kshukla)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4c296358
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4c296358
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4c296358

Branch: refs/heads/master
Commit: 4c29635880446c7b13ca6318f5429d2887538d45
Parents: 0033da8
Author: Kuhu Shukla <[email protected]>
Authored: Fri Sep 15 14:41:41 2017 -0500
Committer: Kuhu Shukla <[email protected]>
Committed: Fri Sep 15 14:41:41 2017 -0500

----------------------------------------------------------------------
 .../library/common/shuffle/impl/ShuffleManager.java      | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4c296358/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 8e9be12..0a0286e 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -721,6 +721,10 @@ public class ShuffleManager implements FetcherCallback {
 
       int numComplete = numCompletedInputs.incrementAndGet();
       if (numComplete == numInputs) {
+        // Poison pill End of Input message to awake blocking take call
+        if (fetchedInput instanceof NullFetchedInput) {
+          completedInputs.add(fetchedInput);
+        }
         LOG.info("All inputs fetched for input vertex : " + 
inputContext.getSourceVertexName());
       }
     } finally {
@@ -875,7 +879,12 @@ public class ShuffleManager implements FetcherCallback {
     } finally {
       lock.unlock();
     }
-    return completedInputs.take(); // block
+    // Block until next input or End of Input message
+    FetchedInput fetchedInput = completedInputs.take();
+    if (fetchedInput instanceof NullFetchedInput) {
+      fetchedInput = null;
+    }
+    return fetchedInput;
   }
 
   public int getNumInputs() {

Reply via email to