Merge remote-tracking branch 'origin/TEZ-3334' into TEZ-3334-MERGE1
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e1a9c282 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e1a9c282 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e1a9c282 Branch: refs/heads/master Commit: e1a9c2822bb0d3e33cc5e8bc127991380d4ab54f Parents: 54f7784 b81592a Author: Jonathan Eagles <[email protected]> Authored: Thu May 4 15:52:23 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu May 4 15:52:23 2017 -0500 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 5 + .../apache/tez/dag/api/TezConfiguration.java | 10 +- .../tez/dag/app/launcher/DagDeleteRunnable.java | 10 +- .../tez/dag/app/launcher/DeletionTracker.java | 12 +- .../dag/app/launcher/DeletionTrackerImpl.java | 31 ++-- .../app/launcher/LocalContainerLauncher.java | 30 ++-- .../app/launcher/TezContainerLauncherImpl.java | 51 ++++--- .../app/rm/container/AMContainerHelpers.java | 10 +- tez-plugins/tez-aux-services/pom.xml | 4 - .../apache/tez/auxservices/ShuffleHandler.java | 8 +- .../tez/auxservices/TestShuffleHandlerJobs.java | 32 ++-- .../runtime/library/common/TezRuntimeUtils.java | 6 +- .../library/common/shuffle/InputHost.java | 2 +- .../library/common/shuffle/ShuffleUtils.java | 13 +- .../common/shuffle/impl/ShuffleManager.java | 146 ++++++++----------- .../common/shuffle/orderedgrouped/Shuffle.java | 3 +- .../ShuffleInputEventHandlerOrderedGrouped.java | 4 +- .../orderedgrouped/ShuffleScheduler.java | 2 +- .../common/sort/impl/PipelinedSorter.java | 9 +- .../common/sort/impl/dflt/DefaultSorter.java | 6 +- .../output/OrderedPartitionedKVOutput.java | 4 +- .../common/shuffle/TestShuffleUtils.java | 13 +- ...tShuffleInputEventHandlerOrderedGrouped.java | 3 +- .../common/sort/impl/TestPipelinedSorter.java | 10 +- .../TestUnorderedPartitionedKVWriter.java | 21 ++- 25 files changed, 227 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --cc tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index 1e23c8f,ba3ecad..31a5d92 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@@ -160,8 -159,10 +158,10 @@@ public class AMContainerHelpers ContainerLaunchContext commonContainerSpec = null; synchronized (commonContainerSpecLock) { if (!commonContainerSpecs.containsKey(tezDAGID)) { + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); commonContainerSpec = - createCommonContainerLaunchContext(acls, credentials, conf); - createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, auxiliaryService); ++ createCommonContainerLaunchContext(acls, credentials, localResources, auxiliaryService); commonContainerSpecs.put(tezDAGID, commonContainerSpec); } else { commonContainerSpec = commonContainerSpecs.get(tezDAGID); http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 5661a6d,57cf4d0..bc3ca0e --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@@ -636,60 -623,40 +626,40 @@@ public class ShuffleManager implements lock.lock(); try { lastProgressTime = System.currentTimeMillis(); - } finally { - lock.unlock(); - } - - inputContext.notifyProgress(); - boolean committed = false; - if (!completedInputSet.get(inputIdentifier)) { - synchronized (completedInputSet) { - if (!completedInputSet.get(inputIdentifier)) { - fetchedInput.commit(); - committed = true; - fetchStatsLogger.logIndividualFetchComplete(copyDuration, fetchedBytes, - decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier); - - // Processing counters for completed and commit fetches only. Need - // additional counters for excessive fetches - which primarily comes - // in after speculation or retries. - shuffledInputsCounter.increment(1); - bytesShuffledCounter.increment(fetchedBytes); - if (fetchedInput.getType() == Type.MEMORY) { - bytesShuffledToMemCounter.increment(fetchedBytes); - } else if (fetchedInput.getType() == Type.DISK) { - bytesShuffledToDiskCounter.increment(fetchedBytes); - } else if (fetchedInput.getType() == Type.DISK_DIRECT) { - bytesShuffledDirectDiskCounter.increment(fetchedBytes); - } - decompressedDataSizeCounter.increment(decompressedLength); - - if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { - registerCompletedInput(fetchedInput); - } else { - registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput); - } + inputContext.notifyProgress(); + if (!completedInputSet.get(inputIdentifier)) { + fetchedInput.commit(); - ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration, ++ fetchStatsLogger.logIndividualFetchComplete(copyDuration, + fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier); + + // Processing counters for completed and commit fetches only. Need + // additional counters for excessive fetches - which primarily comes + // in after speculation or retries. + shuffledInputsCounter.increment(1); + bytesShuffledCounter.increment(fetchedBytes); + if (fetchedInput.getType() == Type.MEMORY) { + bytesShuffledToMemCounter.increment(fetchedBytes); + } else if (fetchedInput.getType() == Type.DISK) { + bytesShuffledToDiskCounter.increment(fetchedBytes); + } else if (fetchedInput.getType() == Type.DISK_DIRECT) { + bytesShuffledDirectDiskCounter.increment(fetchedBytes); + } + decompressedDataSizeCounter.increment(decompressedLength); - lock.lock(); - try { - totalBytesShuffledTillNow += fetchedBytes; - logProgress(); - } finally { - lock.unlock(); - } + if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { + registerCompletedInput(fetchedInput); + } else { + registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput); } - } - } - if (!committed) { - fetchedInput.abort(); // If this fails, the fetcher may attempt another abort. - } else { - lock.lock(); - try { - // Signal the wakeLoop to check for termination. + + totalBytesShuffledTillNow += fetchedBytes; + logProgress(); wakeLoop.signal(); - } finally { - lock.unlock(); + } else { + fetchedInput.abort(); // If this fails, the fetcher may attempt another abort. } + } finally { + lock.unlock(); } // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue. } http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ----------------------------------------------------------------------
