Repository: tez Updated Branches: refs/heads/master a33a62591 -> 3347c94f8
TEZ-2635. Limit number of attempts being downloaded in unordered fetch (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3347c94f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3347c94f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3347c94f Branch: refs/heads/master Commit: 3347c94f863867aab241db34aafa502dc6a1ff1b Parents: a33a625 Author: Rajesh Balamohan <rbalamo...@apache.org> Authored: Fri Jul 31 07:36:33 2015 +0530 Committer: Rajesh Balamohan <rbalamo...@apache.org> Committed: Fri Jul 31 07:36:33 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/shuffle/impl/ShuffleManager.java | 31 ++++++++++++++++---- .../orderedgrouped/ShuffleScheduler.java | 11 +++++-- 3 files changed, 35 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3347c94f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2b60ed7..deb874d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES TEZ-2468. Change the minimum Java version to Java 7. ALL CHANGES: + TEZ-2635. Limit number of attempts being downloaded in unordered fetch. TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation. TEZ-2645. Provide standard analyzers for job analysis. TEZ-2627. Support for Tez Job Priorities. http://git-wip-us.apache.org/repos/asf/tez/blob/3347c94f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index b7c0742..600c332 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -141,8 +141,10 @@ public class ShuffleManager implements FetcherCallback { private final boolean ifileReadAhead; private final int ifileReadAheadLength; - private final String srcNameTrimmed; - + private final String srcNameTrimmed; + + private final int maxTaskOutputAtOnce; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final TezCounter shuffledInputsCounter; @@ -254,6 +256,14 @@ public class ShuffleManager implements FetcherCallback { inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetaData); + /** + * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would + * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL. + */ + this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT))); + Arrays.sort(this.localDisks); shuffleInfoEventsMap = new ConcurrentHashMap<InputIdentifier, ShuffleEventInfo>(); @@ -264,7 +274,7 @@ public class ShuffleManager implements FetcherCallback { + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", " + "localDiskFetchEnabled=" + localDiskFetchEnabled + ", " + "sharedFetchEnabled=" + sharedFetchEnabled + ", " - + httpConnectionParams.toString()); + + httpConnectionParams.toString() + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce); } public void run() throws IOException { @@ -407,6 +417,7 @@ public class ShuffleManager implements FetcherCallback { // remove from the obsolete list. List<InputAttemptIdentifier> pendingInputsForHost = inputHost .clearAndGetPendingInputs(); + int includedMaps = 0; for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost .iterator(); inputIter.hasNext();) { InputAttemptIdentifier input = inputIter.next(); @@ -424,10 +435,20 @@ public class ShuffleManager implements FetcherCallback { // Avoid adding attempts which have been marked as OBSOLETE if (obsoletedInputs.contains(input)) { inputIter.remove(); + continue; + } + + // Check if max threshold is met + if (includedMaps >= maxTaskOutputAtOnce) { + inputIter.remove(); + inputHost.addKnownInput(input); //add to inputHost + } else { + includedMaps++; } } - // TODO NEWTEZ Maybe limit the number of inputs being given to a single - // fetcher, especially in the case where #hosts < #fetchers + if (inputHost.getNumPendingInputs() > 0) { + pendingHosts.add(inputHost); //add it to queue + } fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), inputHost.getSrcPhysicalIndex(), pendingInputsForHost); LOG.info("Created Fetcher for host: " + inputHost.getHost() http://git-wip-us.apache.org/repos/asf/tez/blob/3347c94f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 75dca64..281844f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -271,9 +271,13 @@ class ShuffleScheduler { conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT); - this.maxTaskOutputAtOnce = Math.max(1, conf.getInt( + /** + * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would + * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL. + */ + this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)); + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT))); this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS); this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); @@ -286,7 +290,8 @@ class ShuffleScheduler { + ", reportReadErrorImmediately=" + reportReadErrorImmediately + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches + ", abortFailureLimit=" + abortFailureLimit - + ", maxMapRuntime=" + maxMapRuntime); + + ", maxMapRuntime=" + maxMapRuntime + + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce); } public void start() throws Exception {