TEZ-3685. ShuffleHandler completedInputSet off-by-one error (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d283063c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d283063c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d283063c Branch: refs/heads/master Commit: d283063c5778bdb1db0046a4096143cf0518c667 Parents: e5d01a6 Author: Jonathan Eagles <[email protected]> Authored: Tue Apr 25 15:34:59 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Tue Apr 25 15:34:59 2017 -0500 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../common/shuffle/impl/ShuffleManager.java | 127 +++++++------------ .../orderedgrouped/ShuffleScheduler.java | 2 +- 3 files changed, 51 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d283063c/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index 61473e0..e7f0211 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3685. ShuffleHandler completedInputSet off-by-one error TEZ-3684. Incorporate first pass non-essential TEZ-3334 pre-merge feedback TEZ-3683. LocalContainerLauncher#shouldDelete member variable is not used TEZ-3682. Pass parameters instead of configuration for changes to support tez shuffle handler http://git-wip-us.apache.org/repos/asf/tez/blob/d283063c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 3436fc7..57cf4d0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -444,20 +444,17 @@ public class ShuffleManager implements FetcherCallback { } // Avoid adding attempts which have already completed. - if(input instanceof CompositeInputAttemptIdentifier) { - if (completedInputSet.nextClearBit(input.getInputIdentifier()) >= - input.getInputIdentifier() + ((CompositeInputAttemptIdentifier) input).getInputIdentifierCount()) { - inputIter.remove(); - continue; - } + boolean alreadyCompleted; + if (input instanceof CompositeInputAttemptIdentifier) { + CompositeInputAttemptIdentifier compositeInput = (CompositeInputAttemptIdentifier)input; + int nextClearBit = completedInputSet.nextClearBit(compositeInput.getInputIdentifier()); + int maxClearBit = compositeInput.getInputIdentifier() + compositeInput.getInputIdentifierCount(); + alreadyCompleted = nextClearBit > maxClearBit; } else { - if (completedInputSet.get(input.getInputIdentifier())) { - inputIter.remove(); - continue; - } + alreadyCompleted = completedInputSet.get(input.getInputIdentifier()); } - // Avoid adding attempts which have been marked as OBSOLETE - if (obsoletedInputs.contains(input)) { + // Avoid adding attempts which have already completed or have been marked as OBSOLETE + if (alreadyCompleted || obsoletedInputs.contains(input)) { inputIter.remove(); continue; } @@ -537,23 +534,17 @@ public class ShuffleManager implements FetcherCallback { if (LOG.isDebugEnabled()) { LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete."); } - - if (!completedInputSet.get(inputIdentifier)) { - synchronized (completedInputSet) { - if (!completedInputSet.get(inputIdentifier)) { - NullFetchedInput fetchedInput = new NullFetchedInput(srcAttemptIdentifier); - if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { - registerCompletedInput(fetchedInput); - } else { - registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput); - } - } - } - } - - // Awake the loop to check for termination. lock.lock(); try { + if (!completedInputSet.get(inputIdentifier)) { + NullFetchedInput fetchedInput = new NullFetchedInput(srcAttemptIdentifier); + if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { + registerCompletedInput(fetchedInput); + } else { + registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput); + } + } + // Awake the loop to check for termination. wakeLoop.signal(); } finally { lock.unlock(); @@ -570,7 +561,7 @@ public class ShuffleManager implements FetcherCallback { lastEventReceived.setValue(relativeTime); } - public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) { + void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) { obsoletedInputs.add(srcAttemptIdentifier); // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction. } @@ -632,60 +623,40 @@ public class ShuffleManager implements FetcherCallback { lock.lock(); try { lastProgressTime = System.currentTimeMillis(); - } finally { - lock.unlock(); - } - - inputContext.notifyProgress(); - boolean committed = false; - if (!completedInputSet.get(inputIdentifier)) { - synchronized (completedInputSet) { - if (!completedInputSet.get(inputIdentifier)) { - fetchedInput.commit(); - committed = true; - ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration, - fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier); - - // Processing counters for completed and commit fetches only. Need - // additional counters for excessive fetches - which primarily comes - // in after speculation or retries. - shuffledInputsCounter.increment(1); - bytesShuffledCounter.increment(fetchedBytes); - if (fetchedInput.getType() == Type.MEMORY) { - bytesShuffledToMemCounter.increment(fetchedBytes); - } else if (fetchedInput.getType() == Type.DISK) { - bytesShuffledToDiskCounter.increment(fetchedBytes); - } else if (fetchedInput.getType() == Type.DISK_DIRECT) { - bytesShuffledDirectDiskCounter.increment(fetchedBytes); - } - decompressedDataSizeCounter.increment(decompressedLength); - - if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { - registerCompletedInput(fetchedInput); - } else { - registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput); - } + inputContext.notifyProgress(); + if (!completedInputSet.get(inputIdentifier)) { + fetchedInput.commit(); + ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration, + fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier); + + // Processing counters for completed and commit fetches only. Need + // additional counters for excessive fetches - which primarily comes + // in after speculation or retries. + shuffledInputsCounter.increment(1); + bytesShuffledCounter.increment(fetchedBytes); + if (fetchedInput.getType() == Type.MEMORY) { + bytesShuffledToMemCounter.increment(fetchedBytes); + } else if (fetchedInput.getType() == Type.DISK) { + bytesShuffledToDiskCounter.increment(fetchedBytes); + } else if (fetchedInput.getType() == Type.DISK_DIRECT) { + bytesShuffledDirectDiskCounter.increment(fetchedBytes); + } + decompressedDataSizeCounter.increment(decompressedLength); - lock.lock(); - try { - totalBytesShuffledTillNow += fetchedBytes; - logProgress(); - } finally { - lock.unlock(); - } + if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { + registerCompletedInput(fetchedInput); + } else { + registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput); } - } - } - if (!committed) { - fetchedInput.abort(); // If this fails, the fetcher may attempt another abort. - } else { - lock.lock(); - try { - // Signal the wakeLoop to check for termination. + + totalBytesShuffledTillNow += fetchedBytes; + logProgress(); wakeLoop.signal(); - } finally { - lock.unlock(); + } else { + fetchedInput.abort(); // If this fails, the fetcher may attempt another abort. } + } finally { + lock.unlock(); } // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue. } http://git-wip-us.apache.org/repos/asf/tez/blob/d283063c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 6e42bca..b6171a3 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -1301,7 +1301,7 @@ class ShuffleScheduler { boolean isInputFinished(int inputIndex, int inputEnd) { synchronized (finishedMaps) { - return finishedMaps.nextClearBit(inputIndex) >= inputEnd; + return finishedMaps.nextClearBit(inputIndex) > inputEnd; } }
