Repository: tez
Updated Branches:
  refs/heads/TEZ-3334 3b20be06f -> bc3c17a57


TEZ-3597. Composite Fetch hangs on certain DME empty events. (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3a1f2c60
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3a1f2c60
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3a1f2c60

Branch: refs/heads/TEZ-3334
Commit: 3a1f2c60ffa0dcf462b84e9f6d5fd42d25c4d3c1
Parents: 3b20be0
Author: Jonathan Eagles <[email protected]>
Authored: Tue Jan 31 18:03:42 2017 -0600
Committer: Jonathan Eagles <[email protected]>
Committed: Tue Jan 31 18:03:42 2017 -0600

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                              |  1 +
 .../shuffle/orderedgrouped/ShuffleScheduler.java  | 18 +++++++++++++++---
 2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3a1f2c60/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index e40759f..8b55322 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3597. Composite Fetch hangs on certain DME empty events.
   TEZ-3595. Composite Fetch account error for disk direct
   TEZ-3590. Remove google.protobuf from the tez-auxservices shaded jar
   TEZ-3587. Fetcher fetchInputs() can NPE on srcAttempt due to missing entry 
in pathToAttemptMap

http://git-wip-us.apache.org/repos/asf/tez/blob/3a1f2c60/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 129e0cc..6e42bca 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -1110,8 +1110,14 @@ class ShuffleScheduler {
   }
   
   private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
-    return (!obsoleteInputs.contains(id) && 
-             !isInputFinished(id.getInputIdentifier()));
+    boolean isInputFinished = false;
+    if (id instanceof CompositeInputAttemptIdentifier) {
+      CompositeInputAttemptIdentifier cid = 
(CompositeInputAttemptIdentifier)id;
+      isInputFinished = isInputFinished(cid.getInputIdentifier(), 
cid.getInputIdentifier() + cid.getInputIdentifierCount());
+    } else {
+      isInputFinished = isInputFinished(id.getInputIdentifier());
+    }
+    return !obsoleteInputs.contains(id) && !isInputFinished;
   }
 
   public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost 
host) {
@@ -1286,13 +1292,19 @@ class ShuffleScheduler {
       finishedMaps.set(inputIndex, true);
     }
   }
-  
+
   boolean isInputFinished(int inputIndex) {
     synchronized (finishedMaps) {
       return finishedMaps.get(inputIndex);
     }
   }
 
+  boolean isInputFinished(int inputIndex, int inputEnd) {
+    synchronized (finishedMaps) {
+      return finishedMaps.nextClearBit(inputIndex) >= inputEnd;
+    }
+  }
+
   private class ShuffleSchedulerCallable extends CallableWithNdc<Void> {
 
 

Reply via email to