Repository: tez Updated Branches: refs/heads/branch-0.7 b3234b6bf -> 962970fd3
TEZ-2635. Limit number of attempts being downloaded in unordered fetch (rbalamohan) (cherry picked from commit 3347c94f863867aab241db34aafa502dc6a1ff1b) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/962970fd Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/962970fd Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/962970fd Branch: refs/heads/branch-0.7 Commit: 962970fd35d82ad4eacd4b59ec5a924910933e23 Parents: b3234b6 Author: Rajesh Balamohan <[email protected]> Authored: Fri Jul 31 07:36:33 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Mon Aug 3 04:07:16 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/shuffle/impl/ShuffleManager.java | 31 ++++++++++++++++---- .../orderedgrouped/ShuffleScheduler.java | 11 +++++-- 3 files changed, 35 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/962970fd/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2c1475c..8515599 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -423,6 +423,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2635. Limit number of attempts being downloaded in unordered fetch. TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs. TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error http://git-wip-us.apache.org/repos/asf/tez/blob/962970fd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 8bc44bc..4b837aa 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -143,8 +143,10 @@ public class ShuffleManager implements FetcherCallback { private final boolean ifileReadAhead; private final int ifileReadAheadLength; - private final String srcNameTrimmed; - + private final String srcNameTrimmed; + + private final int maxTaskOutputAtOnce; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final TezCounter shuffledInputsCounter; @@ -256,6 +258,14 @@ public class ShuffleManager implements FetcherCallback { inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetaData); + /** + * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would + * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL. + */ + this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT))); + Arrays.sort(this.localDisks); shuffleInfoEventsMap = new ConcurrentHashMap<InputIdentifier, ShuffleEventInfo>(); @@ -266,7 +276,7 @@ public class ShuffleManager implements FetcherCallback { + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", " + "localDiskFetchEnabled=" + localDiskFetchEnabled + ", " + "sharedFetchEnabled=" + sharedFetchEnabled + ", " - + httpConnectionParams.toString()); + + httpConnectionParams.toString() + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce); } public void run() throws IOException { @@ -407,6 +417,7 @@ public class ShuffleManager implements FetcherCallback { // remove from the obsolete list. List<InputAttemptIdentifier> pendingInputsForHost = inputHost .clearAndGetPendingInputs(); + int includedMaps = 0; for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost .iterator(); inputIter.hasNext();) { InputAttemptIdentifier input = inputIter.next(); @@ -424,10 +435,20 @@ public class ShuffleManager implements FetcherCallback { // Avoid adding attempts which have been marked as OBSOLETE if (obsoletedInputs.contains(input)) { inputIter.remove(); + continue; + } + + // Check if max threshold is met + if (includedMaps >= maxTaskOutputAtOnce) { + inputIter.remove(); + inputHost.addKnownInput(input); //add to inputHost + } else { + includedMaps++; } } - // TODO NEWTEZ Maybe limit the number of inputs being given to a single - // fetcher, especially in the case where #hosts < #fetchers + if (inputHost.getNumPendingInputs() > 0) { + pendingHosts.add(inputHost); //add it to queue + } fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), inputHost.getSrcPhysicalIndex(), pendingInputsForHost); LOG.info("Created Fetcher for host: " + inputHost.getHost() http://git-wip-us.apache.org/repos/asf/tez/blob/962970fd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index a3d79ae..cdfd511 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -157,9 +157,13 @@ class ShuffleScheduler { conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT); - this.maxTaskOutputAtOnce = Math.max(1, conf.getInt( + /** + * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would + * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL. + */ + this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)); + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT))); this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS); this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); @@ -172,7 +176,8 @@ class ShuffleScheduler { + ", reportReadErrorImmediately=" + reportReadErrorImmediately + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches + ", abortFailureLimit=" + abortFailureLimit - + ", maxMapRuntime=" + maxMapRuntime); + + ", maxMapRuntime=" + maxMapRuntime + + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce); } protected synchronized void updateEventReceivedTime() {
