Repository: tez Updated Branches: refs/heads/branch-0.5 21362e652 -> 5c571d592
TEZ-2635. Limit number of attempts being downloaded in unordered fetch (rbalamohan) (cherry picked from commit 3347c94f863867aab241db34aafa502dc6a1ff1b) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5c571d59 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5c571d59 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5c571d59 Branch: refs/heads/branch-0.5 Commit: 5c571d59241aa31d04a9ecaa3385e9f37e459ca7 Parents: 21362e6 Author: Rajesh Balamohan <[email protected]> Authored: Fri Jul 31 07:36:33 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Mon Aug 3 04:15:16 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/shuffle/impl/ShuffleManager.java | 31 ++++++++++++++++---- .../orderedgrouped/ShuffleScheduler.java | 11 +++++-- 3 files changed, 35 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5c571d59/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3ad58a1..17757ff 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2635. Limit number of attempts being downloaded in unordered fetch. TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs. TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error http://git-wip-us.apache.org/repos/asf/tez/blob/5c571d59/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 69c015e..b032bdc 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -136,8 +136,10 @@ public class ShuffleManager implements FetcherCallback { private final boolean ifileReadAhead; private final int ifileReadAheadLength; - private final String srcNameTrimmed; - + private final String srcNameTrimmed; + + private final int maxTaskOutputAtOnce; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final TezCounter shuffledInputsCounter; @@ -228,6 +230,14 @@ public class ShuffleManager implements FetcherCallback { this.localDisks = Iterables.toArray( localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class); + /** + * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would + * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL. + */ + this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT))); + Arrays.sort(this.localDisks); LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec=" @@ -236,7 +246,7 @@ public class ShuffleManager implements FetcherCallback { + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", " + "localDiskFetchEnabled=" + localDiskFetchEnabled + ", " + "sharedFetchEnabled=" + sharedFetchEnabled + ", " - + httpConnectionParams.toString()); + + httpConnectionParams.toString() + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce); } public void run() throws IOException { @@ -357,6 +367,7 @@ public class ShuffleManager implements FetcherCallback { // remove from the obsolete list. List<InputAttemptIdentifier> pendingInputsForHost = inputHost .clearAndGetPendingInputs(); + int includedMaps = 0; for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost .iterator(); inputIter.hasNext();) { InputAttemptIdentifier input = inputIter.next(); @@ -368,10 +379,20 @@ public class ShuffleManager implements FetcherCallback { // Avoid adding attempts which have been marked as OBSOLETE if (obsoletedInputs.contains(input)) { inputIter.remove(); + continue; + } + + // Check if max threshold is met + if (includedMaps >= maxTaskOutputAtOnce) { + inputIter.remove(); + inputHost.addKnownInput(input); //add to inputHost + } else { + includedMaps++; } } - // TODO NEWTEZ Maybe limit the number of inputs being given to a single - // fetcher, especially in the case where #hosts < #fetchers + if (inputHost.getNumPendingInputs() > 0) { + pendingHosts.add(inputHost); //add it to queue + } fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), inputHost.getSrcPhysicalIndex(), pendingInputsForHost); LOG.info("Created Fetcher for host: " + inputHost.getHost() http://git-wip-us.apache.org/repos/asf/tez/blob/5c571d59/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 9cd8c64..db23e0f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -141,9 +141,13 @@ class ShuffleScheduler { conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT); - this.maxTaskOutputAtOnce = Math.max(1, conf.getInt( + /** + * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would + * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL. + */ + this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)); + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT))); this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS); @@ -153,7 +157,8 @@ class ShuffleScheduler { + ", reportReadErrorImmediately=" + reportReadErrorImmediately + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches + ", abortFailureLimit=" + abortFailureLimit - + ", maxMapRuntime=" + maxMapRuntime); + + ", maxMapRuntime=" + maxMapRuntime + + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce); } public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier,
