Repository: spark
Updated Branches:
  refs/heads/master b96fd44f0 -> a802c69b1


[SPARK-18364][YARN] Expose metrics for YarnShuffleService

## What changes were proposed in this pull request?

This PR is follow-up of closed  https://github.com/apache/spark/pull/17401 
which only ended due to of inactivity, but its still nice feature to have.
Given review by jerryshao taken in consideration and edited:
- VisibleForTesting deleted because of dependency conflicts
- removed unnecessary reflection for `MetricsSystemImpl`
- added more available types for gauge

## How was this patch tested?

Manual deploy of new yarn-shuffle jar into a Node Manager and verifying that 
the metrics appear in the Node Manager-standard location. This is JMX with an 
query endpoint running on `hostname:port`

Resulting metrics look like this:
```
curl -sk -XGET hostname:port |  grep -v '#' | grep 'shuffleService'
hadoop_nodemanager_openblockrequestlatencymillis_rate15{name="shuffleService",} 
0.31428910657834713
hadoop_nodemanager_blocktransferratebytes_rate15{name="shuffleService",} 
566144.9983653595
hadoop_nodemanager_blocktransferratebytes_ratemean{name="shuffleService",} 
2464409.9678099006
hadoop_nodemanager_openblockrequestlatencymillis_rate1{name="shuffleService",} 
1.2893844732240272
hadoop_nodemanager_registeredexecutorssize{name="shuffleService",} 2.0
hadoop_nodemanager_openblockrequestlatencymillis_ratemean{name="shuffleService",}
 1.255574678369966
hadoop_nodemanager_openblockrequestlatencymillis_count{name="shuffleService",} 
315.0
hadoop_nodemanager_openblockrequestlatencymillis_rate5{name="shuffleService",} 
0.7661929192569739
hadoop_nodemanager_registerexecutorrequestlatencymillis_ratemean{name="shuffleService",}
 0.0
hadoop_nodemanager_registerexecutorrequestlatencymillis_count{name="shuffleService",}
 0.0
hadoop_nodemanager_registerexecutorrequestlatencymillis_rate1{name="shuffleService",}
 0.0
hadoop_nodemanager_registerexecutorrequestlatencymillis_rate5{name="shuffleService",}
 0.0
hadoop_nodemanager_blocktransferratebytes_count{name="shuffleService",} 
6.18271213E8
hadoop_nodemanager_registerexecutorrequestlatencymillis_rate15{name="shuffleService",}
 0.0
hadoop_nodemanager_blocktransferratebytes_rate5{name="shuffleService",} 
1154114.4881816586
hadoop_nodemanager_blocktransferratebytes_rate1{name="shuffleService",} 
574745.0749848988
```

Closes #22485 from mareksimunek/SPARK-18364.

Lead-authored-by: marek.simunek <marek.simu...@firma.seznam.cz>
Co-authored-by: Andrew Ash <and...@andrewash.com>
Signed-off-by: Thomas Graves <tgra...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a802c69b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a802c69b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a802c69b

Branch: refs/heads/master
Commit: a802c69b130b69a35b372ffe1b01289577f6fafb
Parents: b96fd44
Author: marek.simunek <marek.simu...@firma.seznam.cz>
Authored: Mon Oct 1 11:04:37 2018 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Mon Oct 1 11:04:37 2018 -0500

----------------------------------------------------------------------
 .../spark/network/yarn/YarnShuffleService.java  |  11 ++
 .../network/yarn/YarnShuffleServiceMetrics.java | 137 +++++++++++++++++++
 .../yarn/YarnShuffleServiceMetricsSuite.scala   |  73 ++++++++++
 3 files changed, 221 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a802c69b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
----------------------------------------------------------------------
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index d8b2ed6..72ae1a1 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.api.*;
 import org.apache.spark.network.util.LevelDBProvider;
@@ -168,6 +170,15 @@ public class YarnShuffleService extends AuxiliaryService {
       TransportConf transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(conf));
       blockHandler = new ExternalShuffleBlockHandler(transportConf, 
registeredExecutorFile);
 
+      // register metrics on the block handler into the Node Manager's metrics 
system.
+      YarnShuffleServiceMetrics serviceMetrics =
+        new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
+
+      MetricsSystemImpl metricsSystem = (MetricsSystemImpl) 
DefaultMetricsSystem.instance();
+      metricsSystem.register(
+        "sparkShuffleService", "Metrics on the Spark Shuffle Service", 
serviceMetrics);
+      logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
+
       // If authentication is enabled, set up the shuffle server to use a
       // special RPC handler that filters out unauthenticated fetch requests
       List<TransportServerBootstrap> bootstraps = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/spark/blob/a802c69b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
----------------------------------------------------------------------
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
new file mode 100644
index 0000000..3e4d479
--- /dev/null
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.yarn;
+
+import java.util.Map;
+
+import com.codahale.metrics.*;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+
+/**
+ * Forward {@link 
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}
+ * to hadoop metrics system.
+ * NodeManager by default exposes JMX endpoint where can be collected.
+ */
+class YarnShuffleServiceMetrics implements MetricsSource {
+
+  private final MetricSet metricSet;
+
+  YarnShuffleServiceMetrics(MetricSet metricSet) {
+    this.metricSet = metricSet;
+  }
+
+  /**
+   * Get metrics from the source
+   *
+   * @param collector to contain the resulting metrics snapshot
+   * @param all       if true, return all metrics even if unchanged.
+   */
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    MetricsRecordBuilder metricsRecordBuilder = 
collector.addRecord("sparkShuffleService");
+
+    for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
+      collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
+   * The metric types used in
+   * {@link 
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}.
+   * Visible for testing.
+   */
+  public static void collectMetric(
+    MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {
+
+    if (metric instanceof Timer) {
+      Timer t = (Timer) metric;
+      metricsRecordBuilder
+        .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of 
timer " + name),
+          t.getCount())
+        .addGauge(
+          new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of 
timer " + name),
+          t.getFifteenMinuteRate())
+        .addGauge(
+          new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of 
timer " + name),
+          t.getFiveMinuteRate())
+        .addGauge(
+          new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of 
timer " + name),
+          t.getOneMinuteRate())
+        .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate 
of timer " + name),
+          t.getMeanRate());
+    } else if (metric instanceof Meter) {
+      Meter m = (Meter) metric;
+      metricsRecordBuilder
+        .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of 
meter " + name),
+          m.getCount())
+        .addGauge(
+          new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of 
meter " + name),
+          m.getFifteenMinuteRate())
+        .addGauge(
+          new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of 
meter " + name),
+          m.getFiveMinuteRate())
+        .addGauge(
+          new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of 
meter " + name),
+          m.getOneMinuteRate())
+        .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate 
of meter " + name),
+          m.getMeanRate());
+    } else if (metric instanceof Gauge) {
+      final Object gaugeValue = ((Gauge) metric).getValue();
+      if (gaugeValue instanceof Integer) {
+        metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), 
(Integer) gaugeValue);
+      } else if (gaugeValue instanceof Long) {
+        metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), 
(Long) gaugeValue);
+      } else if (gaugeValue instanceof Float) {
+        metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), 
(Float) gaugeValue);
+      } else if (gaugeValue instanceof Double) {
+        metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), 
(Double) gaugeValue);
+      } else {
+        throw new IllegalStateException(
+                "Not supported class type of metric[" + name + "] for value " 
+ gaugeValue);
+      }
+    }
+  }
+
+  private static MetricsInfo getShuffleServiceMetricsInfo(String name) {
+    return new ShuffleServiceMetricsInfo(name, "Value of gauge " + name);
+  }
+
+  private static class ShuffleServiceMetricsInfo implements MetricsInfo {
+
+    private final String name;
+    private final String description;
+
+    ShuffleServiceMetricsInfo(String name, String description) {
+      this.name = name;
+      this.description = description;
+    }
+
+    @Override
+    public String name() {
+      return name;
+    }
+
+    @Override
+    public String description() {
+      return description;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a802c69b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
new file mode 100644
index 0000000..40b9228
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.yarn
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.metrics2.MetricsRecordBuilder
+import org.mockito.Matchers._
+import org.mockito.Mockito.{mock, times, verify, when}
+import org.scalatest.Matchers
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.network.server.OneForOneStreamManager
+import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, 
ExternalShuffleBlockResolver}
+
+class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers {
+
+  val streamManager = mock(classOf[OneForOneStreamManager])
+  val blockResolver = mock(classOf[ExternalShuffleBlockResolver])
+  when(blockResolver.getRegisteredExecutorsSize).thenReturn(42)
+
+  val metrics = new ExternalShuffleBlockHandler(streamManager, 
blockResolver).getAllMetrics
+
+  test("metrics named as expected") {
+    val allMetrics = Set(
+      "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis",
+      "blockTransferRateBytes", "registeredExecutorsSize")
+
+    metrics.getMetrics.keySet().asScala should be (allMetrics)
+  }
+
+  // these three metrics have the same effect on the collector
+  for (testname <- Seq("openBlockRequestLatencyMillis",
+      "registerExecutorRequestLatencyMillis",
+      "blockTransferRateBytes")) {
+    test(s"$testname - collector receives correct types") {
+      val builder = mock(classOf[MetricsRecordBuilder])
+      when(builder.addCounter(any(), anyLong())).thenReturn(builder)
+      when(builder.addGauge(any(), anyDouble())).thenReturn(builder)
+
+      YarnShuffleServiceMetrics.collectMetric(builder, testname,
+        metrics.getMetrics.get(testname))
+
+      verify(builder).addCounter(anyObject(), anyLong())
+      verify(builder, times(4)).addGauge(anyObject(), anyDouble())
+    }
+  }
+
+  // this metric writes only one gauge to the collector
+  test("registeredExecutorsSize - collector receives correct types") {
+    val builder = mock(classOf[MetricsRecordBuilder])
+
+    YarnShuffleServiceMetrics.collectMetric(builder, "registeredExecutorsSize",
+      metrics.getMetrics.get("registeredExecutorsSize"))
+
+    // only one
+    verify(builder).addGauge(anyObject(), anyInt())
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to