Repository: tez
Updated Branches:
  refs/heads/master 3ec953753 -> d0b189af6


Revert "TEZ-3642. Remove ShuffleClientMetrics (zhiyuany)"

This reverts commit f7f60385cf5d8d66a1ee02e64d1e671bf1ad8771.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f5d4c36c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f5d4c36c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f5d4c36c

Branch: refs/heads/master
Commit: f5d4c36cc2fbce8e4075b2f7385faf7a13a862c3
Parents: 3ec9537
Author: Zhiyuan Yang <[email protected]>
Authored: Mon Apr 3 11:18:42 2017 -0700
Committer: Zhiyuan Yang <[email protected]>
Committed: Mon Apr 3 11:18:42 2017 -0700

----------------------------------------------------------------------
 .../orderedgrouped/FetcherOrderedGrouped.java   | 10 +++
 .../orderedgrouped/ShuffleClientMetrics.java    | 92 ++++++++++++++++++++
 .../orderedgrouped/ShuffleScheduler.java        |  6 +-
 .../shuffle/orderedgrouped/TestFetcher.java     | 27 ++++--
 4 files changed, 125 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f5d4c36c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 5cad6fc..58ca1e2 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -69,6 +69,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   private final TezCounter wrongReduceErrs;
   private final FetchedInputAllocatorOrderedGrouped allocator;
   private final ShuffleScheduler scheduler;
+  private final ShuffleClientMetrics metrics;
   private final ExceptionReporter exceptionReporter;
   private final int id;
   private final String logIdentifier;
@@ -106,6 +107,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams,
                                ShuffleScheduler scheduler,
                                FetchedInputAllocatorOrderedGrouped allocator,
+                               ShuffleClientMetrics metrics,
                                ExceptionReporter exceptionReporter, 
JobTokenSecretManager jobTokenSecretMgr,
                                boolean ifileReadAhead, int 
ifileReadAheadLength,
                                CompressionCodec codec,
@@ -128,6 +130,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
                                boolean verifyDiskChecksum) {
     this.scheduler = scheduler;
     this.allocator = allocator;
+    this.metrics = metrics;
     this.exceptionReporter = exceptionReporter;
     this.mapHost = mapHost;
     this.currentPartition = this.mapHost.getPartitionId();
@@ -166,6 +169,8 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   @VisibleForTesting
   protected void fetchNext() throws InterruptedException, IOException {
     try {
+      metrics.threadBusy();
+
       if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) 
&& mapHost.getPort() == localShufflePort) {
         setupLocalDiskFetch(mapHost);
       } else {
@@ -175,6 +180,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     } finally {
       cleanupCurrentConnection(false);
       scheduler.freeHost(mapHost);
+      metrics.threadFree();
     }
   }
 
@@ -518,6 +524,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
                               endTime - startTime, mapOutput, false);
       // Note successful shuffle
       remaining.remove(srcAttemptId.toString());
+      metrics.successFetch();
       return null;
     } catch (IOException ioe) {
       if (stopped) {
@@ -555,6 +562,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
 
       // Inform the shuffle-scheduler
       mapOutput.abort();
+      metrics.failedFetch();
       return new InputAttemptIdentifier[] {srcAttemptId};
     }
   }
@@ -677,11 +685,13 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> 
{
           scheduler.copySucceeded(srcAttemptId, host, 
indexRecord.getPartLength(),
               indexRecord.getRawLength(), (endTime - startTime), mapOutput, 
true);
           iter.remove();
+          metrics.successFetch();
         } catch (IOException e) {
           if (mapOutput != null) {
             mapOutput.abort();
           }
           if (!stopped) {
+            metrics.failedFetch();
             ioErrs.increment(1);
             scheduler.copyFailed(srcAttemptId, host, true, false, true);
             LOG.warn("Failed to read local disk output of " + srcAttemptId + " 
from " +

http://git-wip-us.apache.org/repos/asf/tez/blob/f5d4c36c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
new file mode 100644
index 0000000..f297dad
--- /dev/null
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+
+class ShuffleClientMetrics implements Updater {
+
+  private MetricsRecord shuffleMetrics = null;
+  private int numFailedFetches = 0;
+  private int numSuccessFetches = 0;
+  private long numBytes = 0;
+  private int numThreadsBusy = 0;
+  private final int numCopiers;
+  
+  ShuffleClientMetrics(String dagName, String vertexName, int taskIndex, 
Configuration conf, 
+      String user) {
+    this.numCopiers = 
+        conf.getInt(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
+            
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
+
+    MetricsContext metricsContext = MetricsUtil.getContext(Constants.TEZ);
+    this.shuffleMetrics = 
+      MetricsUtil.createRecord(metricsContext, "shuffleInput");
+    this.shuffleMetrics.setTag("user", user);
+    this.shuffleMetrics.setTag("dagName", dagName);
+    this.shuffleMetrics.setTag("taskId", 
TezRuntimeUtils.getTaskIdentifier(vertexName, taskIndex));
+    this.shuffleMetrics.setTag("sessionId", 
+        conf.get(
+            TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID,
+            
TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT));
+    metricsContext.registerUpdater(this);
+  }
+  public synchronized void inputBytes(long numBytes) {
+    this.numBytes += numBytes;
+  }
+  public synchronized void failedFetch() {
+    ++numFailedFetches;
+  }
+  public synchronized void successFetch() {
+    ++numSuccessFetches;
+  }
+  public synchronized void threadBusy() {
+    ++numThreadsBusy;
+  }
+  public synchronized void threadFree() {
+    --numThreadsBusy;
+  }
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
+      shuffleMetrics.incrMetric("shuffle_failed_fetches", 
+                                numFailedFetches);
+      shuffleMetrics.incrMetric("shuffle_success_fetches", 
+                                numSuccessFetches);
+      if (numCopiers != 0) {
+        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
+            100*((float)numThreadsBusy/numCopiers));
+      } else {
+        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
+      }
+      numBytes = 0;
+      numSuccessFetches = 0;
+      numFailedFetches = 0;
+    }
+    shuffleMetrics.update();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/f5d4c36c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 953c73e..cce486c 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -204,6 +204,7 @@ class ShuffleScheduler {
 
   private final HttpConnectionParams httpConnectionParams;
   private final FetchedInputAllocatorOrderedGrouped allocator;
+  private final ShuffleClientMetrics shuffleMetrics;
   private final ExceptionReporter exceptionReporter;
   private final MergeManager mergeManager;
   private final JobTokenSecretManager jobTokenSecretManager;
@@ -369,6 +370,9 @@ class ShuffleScheduler {
         TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
     this.asyncHttp = 
conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, 
false);
     this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
+    this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(),
+        inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
+        this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
     SecretKey jobTokenSecret = ShuffleUtils
         .getJobTokenSecretFromTokenBytes(inputContext
             
.getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
@@ -1387,7 +1391,7 @@ class ShuffleScheduler {
   @VisibleForTesting
   FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
     return new FetcherOrderedGrouped(httpConnectionParams, 
ShuffleScheduler.this, allocator,
-        exceptionReporter, jobTokenSecretManager, ifileReadAhead, 
ifileReadAheadLength,
+        shuffleMetrics, exceptionReporter, jobTokenSecretManager, 
ifileReadAhead, ifileReadAheadLength,
         codec, conf, localDiskFetchEnabled, localHostname, shufflePort, 
srcNameTrimmed, mapHost,
         ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, 
wrongMapErrsCounter,
         connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, 
asyncHttp, sslShuffle,

http://git-wip-us.apache.org/repos/asf/tez/blob/f5d4c36c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index a9b57a9..310f1b2 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -110,6 +110,7 @@ public class TestFetcher {
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
     MergeManager merger = mock(MergeManager.class);
 
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
 
     InputContext inputContext = mock(InputContext.class);
@@ -123,7 +124,7 @@ public class TestFetcher {
     doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost);
 
     FetcherOrderedGrouped fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0,
+        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, 
null, false, 0,
             null, conf, false, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, 
wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -141,6 +142,7 @@ public class TestFetcher {
     Configuration conf = new TezConfiguration();
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
     MergeManager merger = mock(MergeManager.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
 
     InputContext inputContext = mock(InputContext.class);
@@ -151,7 +153,7 @@ public class TestFetcher {
     final boolean DISABLE_LOCAL_FETCH = false;
     MapHost mapHost = new MapHost(HOST, PORT, 0);
     FetcherOrderedGrouped fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0,
+        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, 
null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, 
wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -169,7 +171,7 @@ public class TestFetcher {
     // if hostname does not match use http
     mapHost = new MapHost(HOST + "_OTHER", PORT, 0);
     fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0,
+        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, 
null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, 
wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -185,7 +187,7 @@ public class TestFetcher {
     // if port does not match use http
     mapHost = new MapHost(HOST, PORT + 1, 0);
     fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0,
+        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, 
null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, 
wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -200,7 +202,7 @@ public class TestFetcher {
 
     //if local fetch is not enabled
     mapHost = new MapHost(HOST, PORT, 0);
-    fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, 
null, false, 0,
+    fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, 
shuffle, null, false, 0,
         null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
         wrongLengthErrsCounter, badIdErrsCounter,
         wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, 
APP_ID, DAG_ID,
@@ -219,13 +221,14 @@ public class TestFetcher {
     Configuration conf = new TezConfiguration();
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
     MergeManager merger = mock(MergeManager.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
     InputContext inputContext = mock(InputContext.class);
     when(inputContext.getCounters()).thenReturn(new TezCounters());
     when(inputContext.getSourceVertexName()).thenReturn("");
 
     MapHost host = new MapHost(HOST, PORT, 1);
-    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, 
merger, shuffle, null, false, 0,
+    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, 
merger, metrics, shuffle, null, false, 0,
         null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, 
wrongLengthErrsCounter, badIdErrsCounter,
         wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, 
APP_ID, DAG_ID,
         false, false, true);
@@ -297,6 +300,9 @@ public class TestFetcher {
     verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), 
host, true, false, true);
     verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), 
host, true, false, true);
 
+    verify(metrics, times(3)).successFetch();
+    verify(metrics, times(2)).failedFetch();
+
     verify(spyFetcher).putBackRemainingMapOutputs(host);
     verify(scheduler).putBackKnownMapOutput(host, 
srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
     verify(scheduler).putBackKnownMapOutput(host, 
srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
@@ -358,6 +364,7 @@ public class TestFetcher {
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
     MergeManager merger = mock(MergeManager.class);
 
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
     InputContext inputContext = mock(InputContext.class);
     when(inputContext.getCounters()).thenReturn(new TezCounters());
@@ -366,7 +373,7 @@ public class TestFetcher {
 
     HttpConnectionParams httpConnectionParams = 
ShuffleUtils.getHttpConnectionParams(conf);
     final MapHost host = new MapHost(HOST, PORT, 1);
-    FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, 
scheduler, merger, shuffle, null, false, 0,
+    FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, 
scheduler, merger, metrics, shuffle, null, false, 0,
         null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, 
wrongLengthErrsCounter, badIdErrsCounter,
         wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, 
APP_ID, DAG_ID,
         false, false, true);
@@ -442,6 +449,7 @@ public class TestFetcher {
 
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
     MergeManager merger = mock(MergeManager.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
 
     TezCounters counters = new TezCounters();
@@ -455,7 +463,7 @@ public class TestFetcher {
     HttpConnectionParams httpConnectionParams = 
ShuffleUtils.getHttpConnectionParams(conf);
     final MapHost host = new MapHost(HOST, PORT, 1);
     FetcherOrderedGrouped mockFetcher =
-        new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, 
shuffle, jobMgr,
+        new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, 
metrics, shuffle, jobMgr,
             false, 0,
             null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
@@ -520,10 +528,11 @@ public class TestFetcher {
     Configuration conf = new TezConfiguration();
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
     MergeManager merger = mock(MergeManager.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
     MapHost mapHost = new MapHost(HOST, PORT, 0);
     FetcherOrderedGrouped fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0,
+        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, 
null, false, 0,
             null, conf, false, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, 
wrongReduceErrsCounter, APP_ID, DAG_ID,

Reply via email to