Repository: tez
Updated Branches:
  refs/heads/master a33a62591 -> 3347c94f8


TEZ-2635. Limit number of attempts being downloaded in unordered fetch 
(rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3347c94f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3347c94f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3347c94f

Branch: refs/heads/master
Commit: 3347c94f863867aab241db34aafa502dc6a1ff1b
Parents: a33a625
Author: Rajesh Balamohan <rbalamo...@apache.org>
Authored: Fri Jul 31 07:36:33 2015 +0530
Committer: Rajesh Balamohan <rbalamo...@apache.org>
Committed: Fri Jul 31 07:36:33 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../common/shuffle/impl/ShuffleManager.java     | 31 ++++++++++++++++----
 .../orderedgrouped/ShuffleScheduler.java        | 11 +++++--
 3 files changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3347c94f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b60ed7..deb874d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2635. Limit number of attempts being downloaded in unordered fetch.
   TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can 
lead to some inefficiency during remove() operation.
   TEZ-2645. Provide standard analyzers for job analysis.
   TEZ-2627. Support for Tez Job Priorities.

http://git-wip-us.apache.org/repos/asf/tez/blob/3347c94f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index b7c0742..600c332 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -141,8 +141,10 @@ public class ShuffleManager implements FetcherCallback {
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
   
-  private final String srcNameTrimmed; 
-  
+  private final String srcNameTrimmed;
+
+  private final int maxTaskOutputAtOnce;
+
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
   private final TezCounter shuffledInputsCounter;
@@ -254,6 +256,14 @@ public class ShuffleManager implements FetcherCallback {
         
inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
     this.shufflePort = 
ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetaData);
 
+    /**
+     * Setting to very high val can lead to Http 400 error. Cap it to 75; 
every attempt id would
+     * be approximately 48 bytes; 48 * 75 = 3600 which should give some room 
for other info in URL.
+     */
+    this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt(
+        
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
+        
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));
+
     Arrays.sort(this.localDisks);
 
     shuffleInfoEventsMap = new ConcurrentHashMap<InputIdentifier, 
ShuffleEventInfo>();
@@ -264,7 +274,7 @@ public class ShuffleManager implements FetcherCallback {
         + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength 
+", "
         + "localDiskFetchEnabled=" + localDiskFetchEnabled + ", "
         + "sharedFetchEnabled=" + sharedFetchEnabled + ", "
-        + httpConnectionParams.toString());
+        + httpConnectionParams.toString() + ", maxTaskOutputAtOnce=" + 
maxTaskOutputAtOnce);
   }
 
   public void run() throws IOException {
@@ -407,6 +417,7 @@ public class ShuffleManager implements FetcherCallback {
     // remove from the obsolete list.
     List<InputAttemptIdentifier> pendingInputsForHost = inputHost
         .clearAndGetPendingInputs();
+    int includedMaps = 0;
     for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
         .iterator(); inputIter.hasNext();) {
       InputAttemptIdentifier input = inputIter.next();
@@ -424,10 +435,20 @@ public class ShuffleManager implements FetcherCallback {
       // Avoid adding attempts which have been marked as OBSOLETE 
       if (obsoletedInputs.contains(input)) {
         inputIter.remove();
+        continue;
+      }
+
+      // Check if max threshold is met
+      if (includedMaps >= maxTaskOutputAtOnce) {
+        inputIter.remove();
+        inputHost.addKnownInput(input); //add to inputHost
+      } else {
+        includedMaps++;
       }
     }
-    // TODO NEWTEZ Maybe limit the number of inputs being given to a single
-    // fetcher, especially in the case where #hosts < #fetchers
+    if (inputHost.getNumPendingInputs() > 0) {
+      pendingHosts.add(inputHost); //add it to queue
+    }
     fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
         inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
     LOG.info("Created Fetcher for host: " + inputHost.getHost()

http://git-wip-us.apache.org/repos/asf/tez/blob/3347c94f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 75dca64..281844f 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -271,9 +271,13 @@ class ShuffleScheduler {
         conf.getBoolean(
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, 
             
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT);
-    this.maxTaskOutputAtOnce = Math.max(1, conf.getInt(
+    /**
+     * Setting to very high val can lead to Http 400 error. Cap it to 75; 
every attempt id would
+     * be approximately 48 bytes; 48 * 75 = 3600 which should give some room 
for other info in URL.
+     */
+    this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt(
             
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
-            
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT));
+            
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));
     
     this.skippedInputCounter = 
inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
     this.firstEventReceived = 
inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
@@ -286,7 +290,8 @@ class ShuffleScheduler {
         + ", reportReadErrorImmediately=" + reportReadErrorImmediately
         + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
         + ", abortFailureLimit=" + abortFailureLimit
-        + ", maxMapRuntime=" + maxMapRuntime);
+        + ", maxMapRuntime=" + maxMapRuntime
+        + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce);
   }
 
   public void start() throws Exception {

Reply via email to