Repository: tez
Updated Branches:
  refs/heads/branch-0.5 21362e652 -> 5c571d592


TEZ-2635. Limit number of attempts being downloaded in unordered fetch 
(rbalamohan)

(cherry picked from commit 3347c94f863867aab241db34aafa502dc6a1ff1b)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5c571d59
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5c571d59
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5c571d59

Branch: refs/heads/branch-0.5
Commit: 5c571d59241aa31d04a9ecaa3385e9f37e459ca7
Parents: 21362e6
Author: Rajesh Balamohan <[email protected]>
Authored: Fri Jul 31 07:36:33 2015 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Mon Aug 3 04:15:16 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../common/shuffle/impl/ShuffleManager.java     | 31 ++++++++++++++++----
 .../orderedgrouped/ShuffleScheduler.java        | 11 +++++--
 3 files changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5c571d59/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3ad58a1..17757ff 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2635. Limit number of attempts being downloaded in unordered fetch.
   TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 
physical inputs.
   TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5c571d59/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 69c015e..b032bdc 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -136,8 +136,10 @@ public class ShuffleManager implements FetcherCallback {
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
   
-  private final String srcNameTrimmed; 
-  
+  private final String srcNameTrimmed;
+
+  private final int maxTaskOutputAtOnce;
+
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
   private final TezCounter shuffledInputsCounter;
@@ -228,6 +230,14 @@ public class ShuffleManager implements FetcherCallback {
     this.localDisks = Iterables.toArray(
         localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
 
+    /**
+     * Setting to very high val can lead to Http 400 error. Cap it to 75; 
every attempt id would
+     * be approximately 48 bytes; 48 * 75 = 3600 which should give some room 
for other info in URL.
+     */
+    this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt(
+        
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
+        
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));
+
     Arrays.sort(this.localDisks);
 
     LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + 
", compressionCodec="
@@ -236,7 +246,7 @@ public class ShuffleManager implements FetcherCallback {
         + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength 
+", "
         + "localDiskFetchEnabled=" + localDiskFetchEnabled + ", "
         + "sharedFetchEnabled=" + sharedFetchEnabled + ", "
-        + httpConnectionParams.toString());
+        + httpConnectionParams.toString() + ", maxTaskOutputAtOnce=" + 
maxTaskOutputAtOnce);
   }
 
   public void run() throws IOException {
@@ -357,6 +367,7 @@ public class ShuffleManager implements FetcherCallback {
     // remove from the obsolete list.
     List<InputAttemptIdentifier> pendingInputsForHost = inputHost
         .clearAndGetPendingInputs();
+    int includedMaps = 0;
     for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
         .iterator(); inputIter.hasNext();) {
       InputAttemptIdentifier input = inputIter.next();
@@ -368,10 +379,20 @@ public class ShuffleManager implements FetcherCallback {
       // Avoid adding attempts which have been marked as OBSOLETE 
       if (obsoletedInputs.contains(input)) {
         inputIter.remove();
+        continue;
+      }
+
+      // Check if max threshold is met
+      if (includedMaps >= maxTaskOutputAtOnce) {
+        inputIter.remove();
+        inputHost.addKnownInput(input); //add to inputHost
+      } else {
+        includedMaps++;
       }
     }
-    // TODO NEWTEZ Maybe limit the number of inputs being given to a single
-    // fetcher, especially in the case where #hosts < #fetchers
+    if (inputHost.getNumPendingInputs() > 0) {
+      pendingHosts.add(inputHost); //add it to queue
+    }
     fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
         inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
     LOG.info("Created Fetcher for host: " + inputHost.getHost()

http://git-wip-us.apache.org/repos/asf/tez/blob/5c571d59/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 9cd8c64..db23e0f 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -141,9 +141,13 @@ class ShuffleScheduler {
         conf.getBoolean(
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, 
             
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT);
-    this.maxTaskOutputAtOnce = Math.max(1, conf.getInt(
+    /**
+     * Setting to very high val can lead to Http 400 error. Cap it to 75; 
every attempt id would
+     * be approximately 48 bytes; 48 * 75 = 3600 which should give some room 
for other info in URL.
+     */
+    this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt(
             
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
-            
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT));
+            
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));
     
     this.skippedInputCounter = 
inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
 
@@ -153,7 +157,8 @@ class ShuffleScheduler {
         + ", reportReadErrorImmediately=" + reportReadErrorImmediately
         + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
         + ", abortFailureLimit=" + abortFailureLimit
-        + ", maxMapRuntime=" + maxMapRuntime);
+        + ", maxMapRuntime=" + maxMapRuntime
+        + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce);
   }
 
   public synchronized void copySucceeded(InputAttemptIdentifier 
srcAttemptIdentifier, 

Reply via email to