This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 568512c  [SPARK-27773][SHUFFLE] add metrics for number of exceptions 
caught in ExternalShuffleBlockHandler
568512c is described below

commit 568512cc82562f9f7d7da0a47bc59fafb86f3ebc
Author: Steven Rand <sr...@palantir.com>
AuthorDate: Thu May 30 13:57:15 2019 -0700

    [SPARK-27773][SHUFFLE] add metrics for number of exceptions caught in 
ExternalShuffleBlockHandler
    
    ## What changes were proposed in this pull request?
    
    Add a metric for number of exceptions caught in the 
`ExternalShuffleBlockHandler`, the idea being that spikes in this metric over 
some time window (or more desirably, the lack thereof) can be used as an 
indicator of the health of an external shuffle service. (Where "health" refers 
to its ability to successfully respond to client requests.)
    
    ## How was this patch tested?
    
    Deployed a build of this PR to a YARN cluster, and confirmed that the 
NodeManagers' JMX metrics include `numCaughtExceptions`.
    
    Closes #24645 from sjrand/SPARK-27773.
    
    Authored-by: Steven Rand <sr...@palantir.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../network/shuffle/ExternalShuffleBlockHandler.java    |  8 ++++++++
 .../spark/network/yarn/YarnShuffleServiceMetrics.java   | 17 ++++++++++-------
 .../network/yarn/YarnShuffleServiceMetricsSuite.scala   |  2 +-
 3 files changed, 19 insertions(+), 8 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index cb2d01d..9b7bf25 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -150,6 +150,11 @@ public class ExternalShuffleBlockHandler extends 
RpcHandler {
     }
   }
 
+  @Override
+  public void exceptionCaught(Throwable cause, TransportClient client) {
+    metrics.caughtExceptions.inc();
+  }
+
   public MetricSet getAllMetrics() {
     return metrics;
   }
@@ -215,6 +220,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler 
{
     private Counter activeConnections = new Counter();
     // Number of registered connections to the shuffle service
     private Counter registeredConnections = new Counter();
+    // Number of exceptions caught in connections to the shuffle service
+    private Counter caughtExceptions = new Counter();
 
     public ShuffleMetrics() {
       allMetrics = new HashMap<>();
@@ -225,6 +232,7 @@ public class ExternalShuffleBlockHandler extends RpcHandler 
{
                      (Gauge<Integer>) () -> 
blockManager.getRegisteredExecutorsSize());
       allMetrics.put("numActiveConnections", activeConnections);
       allMetrics.put("numRegisteredConnections", registeredConnections);
+      allMetrics.put("numCaughtExceptions", caughtExceptions);
     }
 
     @Override
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
index 5012374..6d4e568 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
@@ -96,13 +96,13 @@ class YarnShuffleServiceMetrics implements MetricsSource {
     } else if (metric instanceof Gauge) {
       final Object gaugeValue = ((Gauge) metric).getValue();
       if (gaugeValue instanceof Integer) {
-        metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), 
(Integer) gaugeValue);
+        
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForGauge(name), 
(Integer) gaugeValue);
       } else if (gaugeValue instanceof Long) {
-        metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), 
(Long) gaugeValue);
+        
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForGauge(name), 
(Long) gaugeValue);
       } else if (gaugeValue instanceof Float) {
-        metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), 
(Float) gaugeValue);
+        
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForGauge(name), 
(Float) gaugeValue);
       } else if (gaugeValue instanceof Double) {
-        metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), 
(Double) gaugeValue);
+        
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForGauge(name), 
(Double) gaugeValue);
       } else {
         throw new IllegalStateException(
                 "Not supported class type of metric[" + name + "] for value " 
+ gaugeValue);
@@ -110,15 +110,18 @@ class YarnShuffleServiceMetrics implements MetricsSource {
     } else if (metric instanceof Counter) {
       Counter c = (Counter) metric;
       long counterValue = c.getCount();
-      metricsRecordBuilder.addGauge(new ShuffleServiceMetricsInfo(name, 
"Number of " +
-          "connections to shuffle service " + name), counterValue);
+      
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForCounter(name), 
counterValue);
     }
   }
 
-  private static MetricsInfo getShuffleServiceMetricsInfo(String name) {
+  private static MetricsInfo getShuffleServiceMetricsInfoForGauge(String name) 
{
     return new ShuffleServiceMetricsInfo(name, "Value of gauge " + name);
   }
 
+  private static ShuffleServiceMetricsInfo 
getShuffleServiceMetricsInfoForCounter(String name) {
+    return new ShuffleServiceMetricsInfo(name, "Value of counter " + name);
+  }
+
   private static class ShuffleServiceMetricsInfo implements MetricsInfo {
 
     private final String name;
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
index f538cbc..3736681 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
@@ -39,7 +39,7 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite 
with Matchers {
     val allMetrics = Set(
       "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis",
       "blockTransferRateBytes", "registeredExecutorsSize", 
"numActiveConnections",
-      "numRegisteredConnections")
+      "numRegisteredConnections", "numCaughtExceptions")
 
     metrics.getMetrics.keySet().asScala should be (allMetrics)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to