This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 568512c [SPARK-27773][SHUFFLE] add metrics for number of exceptions caught in ExternalShuffleBlockHandler 568512c is described below commit 568512cc82562f9f7d7da0a47bc59fafb86f3ebc Author: Steven Rand <sr...@palantir.com> AuthorDate: Thu May 30 13:57:15 2019 -0700 [SPARK-27773][SHUFFLE] add metrics for number of exceptions caught in ExternalShuffleBlockHandler ## What changes were proposed in this pull request? Add a metric for number of exceptions caught in the `ExternalShuffleBlockHandler`, the idea being that spikes in this metric over some time window (or more desirably, the lack thereof) can be used as an indicator of the health of an external shuffle service. (Where "health" refers to its ability to successfully respond to client requests.) ## How was this patch tested? Deployed a build of this PR to a YARN cluster, and confirmed that the NodeManagers' JMX metrics include `numCaughtExceptions`. Closes #24645 from sjrand/SPARK-27773. Authored-by: Steven Rand <sr...@palantir.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../network/shuffle/ExternalShuffleBlockHandler.java | 8 ++++++++ .../spark/network/yarn/YarnShuffleServiceMetrics.java | 17 ++++++++++------- .../network/yarn/YarnShuffleServiceMetricsSuite.scala | 2 +- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index cb2d01d..9b7bf25 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -150,6 +150,11 @@ public class ExternalShuffleBlockHandler extends RpcHandler { } } + @Override + public void exceptionCaught(Throwable cause, TransportClient client) { + metrics.caughtExceptions.inc(); + } + public MetricSet getAllMetrics() { return metrics; } @@ -215,6 +220,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler { private Counter activeConnections = new Counter(); // Number of registered connections to the shuffle service private Counter registeredConnections = new Counter(); + // Number of exceptions caught in connections to the shuffle service + private Counter caughtExceptions = new Counter(); public ShuffleMetrics() { allMetrics = new HashMap<>(); @@ -225,6 +232,7 @@ public class ExternalShuffleBlockHandler extends RpcHandler { (Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize()); allMetrics.put("numActiveConnections", activeConnections); allMetrics.put("numRegisteredConnections", registeredConnections); + allMetrics.put("numCaughtExceptions", caughtExceptions); } @Override diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 5012374..6d4e568 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -96,13 +96,13 @@ class YarnShuffleServiceMetrics implements MetricsSource { } else if (metric instanceof Gauge) { final Object gaugeValue = ((Gauge) metric).getValue(); if (gaugeValue instanceof Integer) { - metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Integer) gaugeValue); + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForGauge(name), (Integer) gaugeValue); } else if (gaugeValue instanceof Long) { - metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Long) gaugeValue); + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForGauge(name), (Long) gaugeValue); } else if (gaugeValue instanceof Float) { - metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Float) gaugeValue); + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForGauge(name), (Float) gaugeValue); } else if (gaugeValue instanceof Double) { - metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Double) gaugeValue); + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForGauge(name), (Double) gaugeValue); } else { throw new IllegalStateException( "Not supported class type of metric[" + name + "] for value " + gaugeValue); @@ -110,15 +110,18 @@ class YarnShuffleServiceMetrics implements MetricsSource { } else if (metric instanceof Counter) { Counter c = (Counter) metric; long counterValue = c.getCount(); - metricsRecordBuilder.addGauge(new ShuffleServiceMetricsInfo(name, "Number of " + - "connections to shuffle service " + name), counterValue); + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForCounter(name), counterValue); } } - private static MetricsInfo getShuffleServiceMetricsInfo(String name) { + private static MetricsInfo getShuffleServiceMetricsInfoForGauge(String name) { return new ShuffleServiceMetricsInfo(name, "Value of gauge " + name); } + private static ShuffleServiceMetricsInfo getShuffleServiceMetricsInfoForCounter(String name) { + return new ShuffleServiceMetricsInfo(name, "Value of counter " + name); + } + private static class ShuffleServiceMetricsInfo implements MetricsInfo { private final String name; diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala index f538cbc..3736681 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -39,7 +39,7 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { val allMetrics = Set( "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", "blockTransferRateBytes", "registeredExecutorsSize", "numActiveConnections", - "numRegisteredConnections") + "numRegisteredConnections", "numCaughtExceptions") metrics.getMetrics.keySet().asScala should be (allMetrics) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org