TEZ-3597. Composite Fetch hangs on certain DME empty events. (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3a1f2c60 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3a1f2c60 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3a1f2c60 Branch: refs/heads/master Commit: 3a1f2c60ffa0dcf462b84e9f6d5fd42d25c4d3c1 Parents: 3b20be0 Author: Jonathan Eagles <[email protected]> Authored: Tue Jan 31 18:03:42 2017 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Tue Jan 31 18:03:42 2017 -0600 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../shuffle/orderedgrouped/ShuffleScheduler.java | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3a1f2c60/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index e40759f..8b55322 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3597. Composite Fetch hangs on certain DME empty events. TEZ-3595. Composite Fetch account error for disk direct TEZ-3590. Remove google.protobuf from the tez-auxservices shaded jar TEZ-3587. Fetcher fetchInputs() can NPE on srcAttempt due to missing entry in pathToAttemptMap http://git-wip-us.apache.org/repos/asf/tez/blob/3a1f2c60/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 129e0cc..6e42bca 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -1110,8 +1110,14 @@ class ShuffleScheduler { } private boolean inputShouldBeConsumed(InputAttemptIdentifier id) { - return (!obsoleteInputs.contains(id) && - !isInputFinished(id.getInputIdentifier())); + boolean isInputFinished = false; + if (id instanceof CompositeInputAttemptIdentifier) { + CompositeInputAttemptIdentifier cid = (CompositeInputAttemptIdentifier)id; + isInputFinished = isInputFinished(cid.getInputIdentifier(), cid.getInputIdentifier() + cid.getInputIdentifierCount()); + } else { + isInputFinished = isInputFinished(id.getInputIdentifier()); + } + return !obsoleteInputs.contains(id) && !isInputFinished; } public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) { @@ -1286,13 +1292,19 @@ class ShuffleScheduler { finishedMaps.set(inputIndex, true); } } - + boolean isInputFinished(int inputIndex) { synchronized (finishedMaps) { return finishedMaps.get(inputIndex); } } + boolean isInputFinished(int inputIndex, int inputEnd) { + synchronized (finishedMaps) { + return finishedMaps.nextClearBit(inputIndex) >= inputEnd; + } + } + private class ShuffleSchedulerCallable extends CallableWithNdc<Void> {
