This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dd29fe  [SPARK-25642][YARN] Adding two new metrics to record the 
number of registered connections as well as the number of active connections to 
YARN Shuffle Service
8dd29fe is described below

commit 8dd29fe36b781d115213b1d6a8446ad04e9239bb
Author: pgandhi <pgan...@oath.com>
AuthorDate: Fri Dec 21 11:28:22 2018 -0800

    [SPARK-25642][YARN] Adding two new metrics to record the number of 
registered connections as well as the number of active connections to YARN 
Shuffle Service
    
    Recently, the ability to expose the metrics for YARN Shuffle Service was 
added as part of [SPARK-18364](https://github.com/apache/spark/pull/22485). We 
need to add some metrics to be able to determine the number of active 
connections as well as open connections to the external shuffle service to 
benchmark network and connection issues on large cluster environments.
    
    Added two more shuffle server metrics for Spark Yarn shuffle service: 
numRegisteredConnections which indicate the number of registered connections to 
the shuffle service and numActiveConnections which indicate the number of 
active connections to the shuffle service at any given point in time.
    
    If these metrics are outputted to a file, we get something like this:
    
    1533674653489 default.shuffleService: Hostname=server1.abc.com, 
openBlockRequestLatencyMillis_count=729, 
openBlockRequestLatencyMillis_rate15=0.7110833548897356, 
openBlockRequestLatencyMillis_rate5=1.657808981793011, 
openBlockRequestLatencyMillis_rate1=2.2404486061620474, 
openBlockRequestLatencyMillis_rateMean=0.9242558551196706,
    numRegisteredConnections=35,
    blockTransferRateBytes_count=2635880512, 
blockTransferRateBytes_rate15=2578547.6094160094, 
blockTransferRateBytes_rate5=6048721.726302424, 
blockTransferRateBytes_rate1=8548922.518223226, 
blockTransferRateBytes_rateMean=3341878.633637769, registeredExecutorsSize=5, 
registerExecutorRequestLatencyMillis_count=5, 
registerExecutorRequestLatencyMillis_rate15=0.0027973949328659836, 
registerExecutorRequestLatencyMillis_rate5=0.0021278007987206426, 
registerExecutorRequestLatencyMillis_rate1=2. [...]
    
    Closes #22498 from pgandhi999/SPARK-18364.
    
    Authored-by: pgandhi <pgan...@oath.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../org/apache/spark/network/TransportContext.java |  9 +++++++-
 .../network/server/TransportChannelHandler.java    | 18 +++++++++++++++-
 .../spark/network/server/TransportServer.java      |  5 +++++
 .../shuffle/ExternalShuffleBlockHandler.java       | 24 ++++++++++++++++++++--
 .../spark/network/yarn/YarnShuffleService.java     | 21 +++++++++++--------
 .../network/yarn/YarnShuffleServiceMetrics.java    |  5 +++++
 .../spark/deploy/ExternalShuffleService.scala      |  2 ++
 .../yarn/YarnShuffleServiceMetricsSuite.scala      |  3 ++-
 8 files changed, 73 insertions(+), 14 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 
b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
index 480b526..1a3f3f2 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -20,6 +20,7 @@ package org.apache.spark.network;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.codahale.metrics.Counter;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
@@ -66,6 +67,8 @@ public class TransportContext {
   private final RpcHandler rpcHandler;
   private final boolean closeIdleConnections;
   private final boolean isClientOnly;
+  // Number of registered connections to the shuffle service
+  private Counter registeredConnections = new Counter();
 
   /**
    * Force to create MessageEncoder and MessageDecoder so that we can make 
sure they will be created
@@ -221,7 +224,7 @@ public class TransportContext {
     TransportRequestHandler requestHandler = new 
TransportRequestHandler(channel, client,
       rpcHandler, conf.maxChunksBeingTransferred());
     return new TransportChannelHandler(client, responseHandler, requestHandler,
-      conf.connectionTimeoutMs(), closeIdleConnections);
+      conf.connectionTimeoutMs(), closeIdleConnections, this);
   }
 
   /**
@@ -234,4 +237,8 @@ public class TransportContext {
   }
 
   public TransportConf getConf() { return conf; }
+
+  public Counter getRegisteredConnections() {
+    return registeredConnections;
+  }
 }
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index c824a7b..ca81099 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -21,6 +21,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
+import org.apache.spark.network.TransportContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,18 +58,21 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
   private final TransportRequestHandler requestHandler;
   private final long requestTimeoutNs;
   private final boolean closeIdleConnections;
+  private final TransportContext transportContext;
 
   public TransportChannelHandler(
       TransportClient client,
       TransportResponseHandler responseHandler,
       TransportRequestHandler requestHandler,
       long requestTimeoutMs,
-      boolean closeIdleConnections) {
+      boolean closeIdleConnections,
+      TransportContext transportContext) {
     this.client = client;
     this.responseHandler = responseHandler;
     this.requestHandler = requestHandler;
     this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
     this.closeIdleConnections = closeIdleConnections;
+    this.transportContext = transportContext;
   }
 
   public TransportClient getClient() {
@@ -176,4 +180,16 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
     return responseHandler;
   }
 
+  @Override
+  public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+    transportContext.getRegisteredConnections().inc();
+    super.channelRegistered(ctx);
+  }
+
+  @Override
+  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+    transportContext.getRegisteredConnections().dec();
+    super.channelUnregistered(ctx);
+  }
+
 }
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
index 9c85ab2..eb5f10a 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.codahale.metrics.Counter;
 import com.codahale.metrics.MetricSet;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -159,4 +160,8 @@ public class TransportServer implements Closeable {
     }
     bootstrap = null;
   }
+
+  public Counter getRegisteredConnections() {
+    return context.getRegisteredConnections();
+  }
 }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 098fa79..788a845 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -29,6 +29,7 @@ import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Timer;
+import com.codahale.metrics.Counter;
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -173,7 +174,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler 
{
   /**
    * A simple class to wrap all shuffle service wrapper metrics
    */
-  private class ShuffleMetrics implements MetricSet {
+  @VisibleForTesting
+  public class ShuffleMetrics implements MetricSet {
     private final Map<String, Metric> allMetrics;
     // Time latency for open block request in ms
     private final Timer openBlockRequestLatencyMillis = new Timer();
@@ -181,14 +183,20 @@ public class ExternalShuffleBlockHandler extends 
RpcHandler {
     private final Timer registerExecutorRequestLatencyMillis = new Timer();
     // Block transfer rate in byte per second
     private final Meter blockTransferRateBytes = new Meter();
+    // Number of active connections to the shuffle service
+    private Counter activeConnections = new Counter();
+    // Number of registered connections to the shuffle service
+    private Counter registeredConnections = new Counter();
 
-    private ShuffleMetrics() {
+    public ShuffleMetrics() {
       allMetrics = new HashMap<>();
       allMetrics.put("openBlockRequestLatencyMillis", 
openBlockRequestLatencyMillis);
       allMetrics.put("registerExecutorRequestLatencyMillis", 
registerExecutorRequestLatencyMillis);
       allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
       allMetrics.put("registeredExecutorsSize",
                      (Gauge<Integer>) () -> 
blockManager.getRegisteredExecutorsSize());
+      allMetrics.put("numActiveConnections", activeConnections);
+      allMetrics.put("numRegisteredConnections", registeredConnections);
     }
 
     @Override
@@ -244,4 +252,16 @@ public class ExternalShuffleBlockHandler extends 
RpcHandler {
     }
   }
 
+  @Override
+  public void channelActive(TransportClient client) {
+    metrics.activeConnections.inc();
+    super.channelActive(client);
+  }
+
+  @Override
+  public void channelInactive(TransportClient client) {
+    metrics.activeConnections.dec();
+    super.channelInactive(client);
+  }
+
 }
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 72ae1a1..7e8d3b2 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -170,15 +170,6 @@ public class YarnShuffleService extends AuxiliaryService {
       TransportConf transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(conf));
       blockHandler = new ExternalShuffleBlockHandler(transportConf, 
registeredExecutorFile);
 
-      // register metrics on the block handler into the Node Manager's metrics 
system.
-      YarnShuffleServiceMetrics serviceMetrics =
-        new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
-
-      MetricsSystemImpl metricsSystem = (MetricsSystemImpl) 
DefaultMetricsSystem.instance();
-      metricsSystem.register(
-        "sparkShuffleService", "Metrics on the Spark Shuffle Service", 
serviceMetrics);
-      logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
-
       // If authentication is enabled, set up the shuffle server to use a
       // special RPC handler that filters out unauthenticated fetch requests
       List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
@@ -199,6 +190,18 @@ public class YarnShuffleService extends AuxiliaryService {
       port = shuffleServer.getPort();
       boundPort = port;
       String authEnabledString = authEnabled ? "enabled" : "not enabled";
+
+      // register metrics on the block handler into the Node Manager's metrics 
system.
+      blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections",
+          shuffleServer.getRegisteredConnections());
+      YarnShuffleServiceMetrics serviceMetrics =
+          new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
+
+      MetricsSystemImpl metricsSystem = (MetricsSystemImpl) 
DefaultMetricsSystem.instance();
+      metricsSystem.register(
+          "sparkShuffleService", "Metrics on the Spark Shuffle Service", 
serviceMetrics);
+      logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
+
       logger.info("Started YARN shuffle service for Spark on port {}. " +
         "Authentication is {}.  Registered executor file is {}", port, 
authEnabledString,
         registeredExecutorFile);
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
index 3e4d479..5012374 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
@@ -107,6 +107,11 @@ class YarnShuffleServiceMetrics implements MetricsSource {
         throw new IllegalStateException(
                 "Not supported class type of metric[" + name + "] for value " 
+ gaugeValue);
       }
+    } else if (metric instanceof Counter) {
+      Counter c = (Counter) metric;
+      long counterValue = c.getCount();
+      metricsRecordBuilder.addGauge(new ShuffleServiceMetricsInfo(name, 
"Number of " +
+          "connections to shuffle service " + name), counterValue);
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index f6b3c37..03e3abb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -84,6 +84,8 @@ class ExternalShuffleService(sparkConf: SparkConf, 
securityManager: SecurityMana
     server = transportContext.createServer(port, bootstraps.asJava)
 
     shuffleServiceSource.registerMetricSet(server.getAllMetrics)
+    blockHandler.getAllMetrics.getMetrics.put("numRegisteredConnections",
+        server.getRegisteredConnections)
     shuffleServiceSource.registerMetricSet(blockHandler.getAllMetrics)
     masterMetricsSystem.registerSource(shuffleServiceSource)
     masterMetricsSystem.start()
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
index 40b9228..952fd0b 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
@@ -38,7 +38,8 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite 
with Matchers {
   test("metrics named as expected") {
     val allMetrics = Set(
       "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis",
-      "blockTransferRateBytes", "registeredExecutorsSize")
+      "blockTransferRateBytes", "registeredExecutorsSize", 
"numActiveConnections",
+      "numRegisteredConnections")
 
     metrics.getMetrics.keySet().asScala should be (allMetrics)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to