Repository: tez Updated Branches: refs/heads/branch-0.6 68f17cf75 -> abf49328b
TEZ-2635. Limit number of attempts being downloaded in unordered fetch (rbalamohan) (cherry picked from commit 3347c94f863867aab241db34aafa502dc6a1ff1b) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/abf49328 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/abf49328 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/abf49328 Branch: refs/heads/branch-0.6 Commit: abf49328b5535330608f539c9dd74d331c691035 Parents: 68f17cf Author: Rajesh Balamohan <[email protected]> Authored: Fri Jul 31 07:36:33 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Mon Aug 3 04:11:32 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/shuffle/impl/ShuffleManager.java | 31 ++++++++++++++++---- .../orderedgrouped/ShuffleScheduler.java | 11 +++++-- 3 files changed, 35 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/abf49328/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 20d31b2..b4ba939 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -215,6 +215,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2635. Limit number of attempts being downloaded in unordered fetch. TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs. TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error http://git-wip-us.apache.org/repos/asf/tez/blob/abf49328/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index b7067e5..5d1da7c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -135,8 +135,10 @@ public class ShuffleManager implements FetcherCallback { private final boolean ifileReadAhead; private final int ifileReadAheadLength; - private final String srcNameTrimmed; - + private final String srcNameTrimmed; + + private final int maxTaskOutputAtOnce; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final TezCounter shuffledInputsCounter; @@ -235,6 +237,14 @@ public class ShuffleManager implements FetcherCallback { this.localDisks = Iterables.toArray( localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class); + /** + * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would + * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL. + */ + this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT))); + Arrays.sort(this.localDisks); LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec=" @@ -243,7 +253,7 @@ public class ShuffleManager implements FetcherCallback { + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", " + "localDiskFetchEnabled=" + localDiskFetchEnabled + ", " + "sharedFetchEnabled=" + sharedFetchEnabled + ", " - + httpConnectionParams.toString()); + + httpConnectionParams.toString() + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce); } public void run() throws IOException { @@ -365,6 +375,7 @@ public class ShuffleManager implements FetcherCallback { // remove from the obsolete list. List<InputAttemptIdentifier> pendingInputsForHost = inputHost .clearAndGetPendingInputs(); + int includedMaps = 0; for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost .iterator(); inputIter.hasNext();) { InputAttemptIdentifier input = inputIter.next(); @@ -376,10 +387,20 @@ public class ShuffleManager implements FetcherCallback { // Avoid adding attempts which have been marked as OBSOLETE if (obsoletedInputs.contains(input)) { inputIter.remove(); + continue; + } + + // Check if max threshold is met + if (includedMaps >= maxTaskOutputAtOnce) { + inputIter.remove(); + inputHost.addKnownInput(input); //add to inputHost + } else { + includedMaps++; } } - // TODO NEWTEZ Maybe limit the number of inputs being given to a single - // fetcher, especially in the case where #hosts < #fetchers + if (inputHost.getNumPendingInputs() > 0) { + pendingHosts.add(inputHost); //add it to queue + } fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), inputHost.getSrcPhysicalIndex(), pendingInputsForHost); LOG.info("Created Fetcher for host: " + inputHost.getHost() http://git-wip-us.apache.org/repos/asf/tez/blob/abf49328/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 52e7334..74f0585 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -143,9 +143,13 @@ class ShuffleScheduler { conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT); - this.maxTaskOutputAtOnce = Math.max(1, conf.getInt( + /** + * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would + * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL. + */ + this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)); + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT))); this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS); this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); @@ -157,7 +161,8 @@ class ShuffleScheduler { + ", reportReadErrorImmediately=" + reportReadErrorImmediately + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches + ", abortFailureLimit=" + abortFailureLimit - + ", maxMapRuntime=" + maxMapRuntime); + + ", maxMapRuntime=" + maxMapRuntime + + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce); } protected synchronized void updateEventReceivedTime() {
