Repository: tez
Updated Branches:
  refs/heads/master 6b67b0bc1 -> 6cb82062d


TEZ-2719. Consider reducing logs in unordered fetcher with shared-fetch option 
(rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6cb82062
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6cb82062
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6cb82062

Branch: refs/heads/master
Commit: 6cb82062d6728120844b6856af139fd49c3a2ddb
Parents: 6b67b0b
Author: Rajesh Balamohan <[email protected]>
Authored: Mon Aug 17 04:10:06 2015 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Mon Aug 17 04:10:06 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                              |  1 +
 .../tez/runtime/library/common/shuffle/FetchResult.java  | 11 +++++++++++
 .../tez/runtime/library/common/shuffle/Fetcher.java      |  6 +-----
 .../tez/runtime/library/common/shuffle/InputHost.java    |  9 +++++++++
 .../library/common/shuffle/impl/ShuffleManager.java      |  3 ++-
 5 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6cb82062/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bbe9321..4de1771 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2719. Consider reducing logs in unordered fetcher with shared-fetch 
option
   TEZ-2646. Add scheduling casual dependency for attempts
   TEZ-2647. Add input causality dependency for attempts
   TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts

http://git-wip-us.apache.org/repos/asf/tez/blob/6cb82062/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java
index 1c39a24..d9595f0 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java
@@ -43,13 +43,20 @@ public class FetchResult {
   private final int port;
   private final int partition;
   private final Iterable<InputAttemptIdentifier> pendingInputs;
+  private final String additionalInfo;
 
   public FetchResult(String host, int port, int partition,
       Iterable<InputAttemptIdentifier> pendingInputs) {
+    this(host, port, partition, pendingInputs, null);
+  }
+
+  public FetchResult(String host, int port, int partition,
+      Iterable<InputAttemptIdentifier> pendingInputs, String additionalInfo) {
     this.host = host;
     this.port = port;
     this.partition = partition;
     this.pendingInputs = pendingInputs;
+    this.additionalInfo = additionalInfo;
   }
 
   public String getHost() {
@@ -67,4 +74,8 @@ public class FetchResult {
   public Iterable<InputAttemptIdentifier> getPendingInputs() {
     return pendingInputs;
   }
+
+  public String getAdditionalInfo() {
+    return additionalInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/6cb82062/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 08b59ed..e8389a6 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -229,8 +229,6 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       if (!multiplex) {
         throw new IOException("server didn't return all expected map outputs: "
             + srcAttemptsRemaining.size() + " left.");
-      } else {
-        LOG.info("Shared fetch failed to return " + 
srcAttemptsRemaining.size() + " inputs on this try");
       }
     }
 
@@ -374,10 +372,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       lock = getLock();
       if (lock == null) {
         // re-queue until we get a lock
-        LOG.info("Requeuing " + host + ":" + port
-            + " downloads because we didn't get a lock");
         return new HostFetchResult(new FetchResult(host, port, partition,
-            srcAttemptsRemaining.values()), null, false);
+            srcAttemptsRemaining.values(), "Requeuing as we didn't get a 
lock"), null, false);
       } else {
         if (findInputs() == srcAttemptsRemaining.size()) {
           // double checked after lock

http://git-wip-us.apache.org/repos/asf/tez/blob/6cb82062/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
index 6638633..b3382ea 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
@@ -38,6 +38,7 @@ public class InputHost {
   private final int port;
   private final int srcPhysicalIndex;
   private final String identifier;
+  private String additionalInfo;
 
   private final BlockingQueue<InputAttemptIdentifier> inputs = new 
LinkedBlockingQueue<InputAttemptIdentifier>();
 
@@ -64,6 +65,14 @@ public class InputHost {
     return this.identifier;
   }
 
+  public void setAdditionalInfo(String additionalInfo) {
+    this.additionalInfo = additionalInfo;
+  }
+
+  public String getAdditionalInfo() {
+    return (additionalInfo == null) ? "" : additionalInfo;
+  }
+
   public int getSrcPhysicalIndex() {
     return this.srcPhysicalIndex;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/6cb82062/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 600c332..10a0050 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -339,7 +339,6 @@ public class ShuffleManager implements FetcherCallback {
                 LOG.debug("Processing pending host: " + 
inputHost.toDetailedString());
               }
               if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) {
-                LOG.info("Scheduling fetch for inputHost: " + 
inputHost.getIdentifier());
                 Fetcher fetcher = constructFetcherForHost(inputHost, conf);
                 runningFetchers.add(fetcher);
                 if (isShutdown.get()) {
@@ -452,6 +451,7 @@ public class ShuffleManager implements FetcherCallback {
     fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
         inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
     LOG.info("Created Fetcher for host: " + inputHost.getHost()
+        + ", info: " + inputHost.getAdditionalInfo()
         + ", with inputs: " + pendingInputsForHost);
     return fetcherBuilder.build();
   }
@@ -953,6 +953,7 @@ public class ShuffleManager implements FetcherCallback {
           for (InputAttemptIdentifier input : pendingInputs) {
             inputHost.addKnownInput(input);
           }
+          inputHost.setAdditionalInfo(result.getAdditionalInfo());
           pendingHosts.add(inputHost);
         }
         doBookKeepingForFetcherComplete();

Reply via email to