This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new a5ba479f [#640] feat(netty): Metric system for netty server (#1041)
a5ba479f is described below

commit a5ba479f3967a4732616a0b352b1882f1dc85a87
Author: xumanbu <[email protected]>
AuthorDate: Fri Jul 28 10:39:04 2023 +0800

    [#640] feat(netty): Metric system for netty server (#1041)
    
    ### What changes were proposed in this pull request?
    
    add NettyMetrics for StreamSever
    
    ### Why are the changes needed?
    
    Fix: #640
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    UT
    
    Co-authored-by: jam.xu <[email protected]>
---
 .../apache/uniffle/common/metrics/GRPCMetrics.java | 114 ++-------------------
 .../uniffle/common/metrics/NettyMetrics.java       |  48 +++++++++
 .../metrics/{GRPCMetrics.java => RPCMetrics.java}  |  63 ++----------
 .../org/apache/uniffle/server/ShuffleServer.java   |  11 ++
 .../uniffle/server/ShuffleServerNettyMetrics.java  |  98 ++++++++++++++++++
 .../server/netty/ShuffleServerNettyHandler.java    |  28 ++---
 .../apache/uniffle/server/netty/StreamServer.java  |   4 +
 .../server/netty/StreamServerMetricHandler.java    |  52 ++++++++++
 .../uniffle/server/ShuffleServerMetricsTest.java   |  10 ++
 9 files changed, 253 insertions(+), 175 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index 6a066e15..99786c1e 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -17,18 +17,12 @@
 
 package org.apache.uniffle.common.metrics;
 
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
-import io.prometheus.client.Summary;
 
 import org.apache.uniffle.common.util.Constants;
-import org.apache.uniffle.common.util.JavaUtils;
 
-public abstract class GRPCMetrics {
+public abstract class GRPCMetrics extends RPCMetrics {
   // Grpc server internal executor metrics
   public static final String GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY =
       "grpcServerExecutorActiveThreads";
@@ -43,34 +37,17 @@ public abstract class GRPCMetrics {
   private static final String GRPC_OPEN = "grpc_open";
   private static final String GRPC_TOTAL = "grpc_total";
 
-  private boolean isRegistered = false;
-  protected Map<String, Counter.Child> counterMap = 
JavaUtils.newConcurrentMap();
-  protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
-  protected Map<String, Summary.Child> transportTimeSummaryMap = 
JavaUtils.newConcurrentMap();
-  protected Map<String, Summary.Child> processTimeSummaryMap = 
JavaUtils.newConcurrentMap();
   protected Gauge.Child gaugeGrpcOpen;
   protected Counter.Child counterGrpcTotal;
-  protected MetricsManager metricsManager;
-  protected String tags;
 
   public GRPCMetrics(String tags) {
-    this.tags = tags;
+    super(tags);
   }
 
   public abstract void registerMetrics();
 
-  public void register(CollectorRegistry collectorRegistry) {
-    if (!isRegistered) {
-      Map<String, String> labels = Maps.newHashMap();
-      labels.put(Constants.METRICS_TAG_LABEL_NAME, tags);
-      metricsManager = new MetricsManager(collectorRegistry, labels);
-      registerGeneralMetrics();
-      registerMetrics();
-      isRegistered = true;
-    }
-  }
-
-  private void registerGeneralMetrics() {
+  @Override
+  public void registerGeneralMetrics() {
     gaugeGrpcOpen = metricsManager.addLabeledGauge(GRPC_OPEN);
     counterGrpcTotal = metricsManager.addLabeledCounter(GRPC_TOTAL);
     gaugeMap.putIfAbsent(
@@ -84,51 +61,9 @@ public abstract class GRPCMetrics {
         metricsManager.addLabeledGauge(GRPC_SERVER_CONNECTION_NUMBER));
   }
 
-  public void setGauge(String tag, double value) {
-    if (isRegistered) {
-      Gauge.Child gauge = gaugeMap.get(tag);
-      if (gauge != null) {
-        gauge.set(value);
-      }
-    }
-  }
-
-  public void incGauge(String tag) {
-    incGauge(tag, 1);
-  }
-
-  public void incGauge(String tag, double value) {
-    if (isRegistered) {
-      Gauge.Child gauge = gaugeMap.get(tag);
-      if (gauge != null) {
-        gauge.inc(value);
-      }
-    }
-  }
-
-  public void decGauge(String tag) {
-    decGauge(tag, 1);
-  }
-
-  public void decGauge(String tag, double value) {
-    if (isRegistered) {
-      Gauge.Child gauge = gaugeMap.get(tag);
-      if (gauge != null) {
-        gauge.dec(value);
-      }
-    }
-  }
-
   public void incCounter(String methodName) {
     if (isRegistered) {
-      Gauge.Child gauge = gaugeMap.get(methodName);
-      if (gauge != null) {
-        gauge.inc();
-      }
-      Counter.Child counter = counterMap.get(methodName);
-      if (counter != null) {
-        counter.inc();
-      }
+      super.incCounter(methodName);
       gaugeGrpcOpen.inc();
       counterGrpcTotal.inc();
     }
@@ -136,40 +71,11 @@ public abstract class GRPCMetrics {
 
   public void decCounter(String methodName) {
     if (isRegistered) {
-      Gauge.Child gauge = gaugeMap.get(methodName);
-      if (gauge != null) {
-        gauge.dec();
-      }
+      super.decCounter(methodName);
       gaugeGrpcOpen.dec();
     }
   }
 
-  public void recordTransportTime(String methodName, long 
transportTimeInMillionSecond) {
-    Summary.Child summary = transportTimeSummaryMap.get(methodName);
-    if (summary != null) {
-      summary.observe(transportTimeInMillionSecond / 
Constants.MILLION_SECONDS_PER_SECOND);
-    }
-  }
-
-  public void recordProcessTime(String methodName, long 
processTimeInMillionSecond) {
-    Summary.Child summary = processTimeSummaryMap.get(methodName);
-    if (summary != null) {
-      summary.observe(processTimeInMillionSecond / 
Constants.MILLION_SECONDS_PER_SECOND);
-    }
-  }
-
-  public CollectorRegistry getCollectorRegistry() {
-    return metricsManager.getCollectorRegistry();
-  }
-
-  public Map<String, Counter.Child> getCounterMap() {
-    return counterMap;
-  }
-
-  public Map<String, Gauge.Child> getGaugeMap() {
-    return gaugeMap;
-  }
-
   public Gauge.Child getGaugeGrpcOpen() {
     return gaugeGrpcOpen;
   }
@@ -178,14 +84,6 @@ public abstract class GRPCMetrics {
     return counterGrpcTotal;
   }
 
-  public Map<String, Summary.Child> getTransportTimeSummaryMap() {
-    return transportTimeSummaryMap;
-  }
-
-  public Map<String, Summary.Child> getProcessTimeSummaryMap() {
-    return processTimeSummaryMap;
-  }
-
   public static GRPCMetrics getEmptyGRPCMetrics() {
     return new EmptyGRPCMetrics(Constants.SHUFFLE_SERVER_VERSION);
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
new file mode 100644
index 00000000..6ba4fcc0
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+
+public abstract class NettyMetrics extends RPCMetrics {
+
+  private static final String NETTY_ACTIVE_CONNECTION = 
"netty_active_connection";
+  private static final String NETTY_HANDLE_EXCEPTION = 
"netty_handle_exception";
+
+  protected Gauge.Child gaugeNettyActiveConn;
+  protected Counter.Child counterNettyException;
+
+  public NettyMetrics(String tags) {
+    super(tags);
+  }
+
+  @Override
+  public void registerGeneralMetrics() {
+    gaugeNettyActiveConn = 
metricsManager.addLabeledGauge(NETTY_ACTIVE_CONNECTION);
+    counterNettyException = 
metricsManager.addLabeledCounter(NETTY_HANDLE_EXCEPTION);
+  }
+
+  public Counter.Child getCounterNettyException() {
+    return counterNettyException;
+  }
+
+  public Gauge.Child getGaugeNettyActiveConn() {
+    return gaugeNettyActiveConn;
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
similarity index 63%
copy from 
common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
copy to common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
index 6a066e15..819ebdee 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
@@ -28,37 +28,23 @@ import io.prometheus.client.Summary;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.JavaUtils;
 
-public abstract class GRPCMetrics {
-  // Grpc server internal executor metrics
-  public static final String GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY =
-      "grpcServerExecutorActiveThreads";
-  private static final String GRPC_SERVER_EXECUTOR_ACTIVE_THREADS =
-      "grpc_server_executor_active_threads";
-  public static final String GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY =
-      "grpcServerExecutorBlockingQueueSize";
-  private static final String GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE =
-      "grpc_server_executor_blocking_queue_size";
-  public static final String GRPC_SERVER_CONNECTION_NUMBER_KEY = 
"grpcServerConnectionNumber";
-  private static final String GRPC_SERVER_CONNECTION_NUMBER = 
"grpc_server_connection_number";
-  private static final String GRPC_OPEN = "grpc_open";
-  private static final String GRPC_TOTAL = "grpc_total";
-
-  private boolean isRegistered = false;
+public abstract class RPCMetrics {
+  protected boolean isRegistered = false;
   protected Map<String, Counter.Child> counterMap = 
JavaUtils.newConcurrentMap();
   protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
   protected Map<String, Summary.Child> transportTimeSummaryMap = 
JavaUtils.newConcurrentMap();
   protected Map<String, Summary.Child> processTimeSummaryMap = 
JavaUtils.newConcurrentMap();
-  protected Gauge.Child gaugeGrpcOpen;
-  protected Counter.Child counterGrpcTotal;
   protected MetricsManager metricsManager;
   protected String tags;
 
-  public GRPCMetrics(String tags) {
+  public RPCMetrics(String tags) {
     this.tags = tags;
   }
 
   public abstract void registerMetrics();
 
+  public abstract void registerGeneralMetrics();
+
   public void register(CollectorRegistry collectorRegistry) {
     if (!isRegistered) {
       Map<String, String> labels = Maps.newHashMap();
@@ -70,20 +56,6 @@ public abstract class GRPCMetrics {
     }
   }
 
-  private void registerGeneralMetrics() {
-    gaugeGrpcOpen = metricsManager.addLabeledGauge(GRPC_OPEN);
-    counterGrpcTotal = metricsManager.addLabeledCounter(GRPC_TOTAL);
-    gaugeMap.putIfAbsent(
-        GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY,
-        metricsManager.addLabeledGauge(GRPC_SERVER_EXECUTOR_ACTIVE_THREADS));
-    gaugeMap.putIfAbsent(
-        GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY,
-        
metricsManager.addLabeledGauge(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE));
-    gaugeMap.putIfAbsent(
-        GRPC_SERVER_CONNECTION_NUMBER_KEY,
-        metricsManager.addLabeledGauge(GRPC_SERVER_CONNECTION_NUMBER));
-  }
-
   public void setGauge(String tag, double value) {
     if (isRegistered) {
       Gauge.Child gauge = gaugeMap.get(tag);
@@ -119,28 +91,25 @@ public abstract class GRPCMetrics {
     }
   }
 
-  public void incCounter(String methodName) {
+  public void incCounter(String metricKey) {
     if (isRegistered) {
-      Gauge.Child gauge = gaugeMap.get(methodName);
+      Gauge.Child gauge = gaugeMap.get(metricKey);
       if (gauge != null) {
         gauge.inc();
       }
-      Counter.Child counter = counterMap.get(methodName);
+      Counter.Child counter = counterMap.get(metricKey);
       if (counter != null) {
         counter.inc();
       }
-      gaugeGrpcOpen.inc();
-      counterGrpcTotal.inc();
     }
   }
 
-  public void decCounter(String methodName) {
+  public void decCounter(String metricKey) {
     if (isRegistered) {
-      Gauge.Child gauge = gaugeMap.get(methodName);
+      Gauge.Child gauge = gaugeMap.get(metricKey);
       if (gauge != null) {
         gauge.dec();
       }
-      gaugeGrpcOpen.dec();
     }
   }
 
@@ -170,14 +139,6 @@ public abstract class GRPCMetrics {
     return gaugeMap;
   }
 
-  public Gauge.Child getGaugeGrpcOpen() {
-    return gaugeGrpcOpen;
-  }
-
-  public Counter.Child getCounterGrpcTotal() {
-    return counterGrpcTotal;
-  }
-
   public Map<String, Summary.Child> getTransportTimeSummaryMap() {
     return transportTimeSummaryMap;
   }
@@ -185,8 +146,4 @@ public abstract class GRPCMetrics {
   public Map<String, Summary.Child> getProcessTimeSummaryMap() {
     return processTimeSummaryMap;
   }
-
-  public static GRPCMetrics getEmptyGRPCMetrics() {
-    return new EmptyGRPCMetrics(Constants.SHUFFLE_SERVER_VERSION);
-  }
 }
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 6770c3d9..3e953d07 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -42,6 +42,7 @@ import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.metrics.JvmMetrics;
 import org.apache.uniffle.common.metrics.MetricReporter;
 import org.apache.uniffle.common.metrics.MetricReporterFactory;
+import org.apache.uniffle.common.metrics.NettyMetrics;
 import org.apache.uniffle.common.rpc.ServerInterface;
 import org.apache.uniffle.common.security.SecurityConfig;
 import org.apache.uniffle.common.security.SecurityContextFactory;
@@ -86,6 +87,7 @@ public class ShuffleServer {
   private HealthCheck healthCheck;
   private Set<String> tags = Sets.newHashSet();
   private GRPCMetrics grpcMetrics;
+  private NettyMetrics nettyMetrics;
   private MetricReporter metricReporter;
 
   private AtomicReference<ServerStatus> serverStatus = new 
AtomicReference(ServerStatus.ACTIVE);
@@ -214,6 +216,8 @@ public class ShuffleServer {
         ShuffleServerMetrics.getCollectorRegistry());
     jettyServer.registerInstance(
         CollectorRegistry.class.getCanonicalName() + "#grpc", 
grpcMetrics.getCollectorRegistry());
+    jettyServer.registerInstance(
+        CollectorRegistry.class.getCanonicalName() + "#netty", 
nettyMetrics.getCollectorRegistry());
     jettyServer.registerInstance(
         CollectorRegistry.class.getCanonicalName() + "#jvm", 
JvmMetrics.getCollectorRegistry());
     jettyServer.registerInstance(
@@ -221,6 +225,7 @@ public class ShuffleServer {
         new CoalescedCollectorRegistry(
             ShuffleServerMetrics.getCollectorRegistry(),
             grpcMetrics.getCollectorRegistry(),
+            nettyMetrics.getCollectorRegistry(),
             JvmMetrics.getCollectorRegistry()));
 
     SecurityConfig securityConfig = null;
@@ -290,6 +295,8 @@ public class ShuffleServer {
     ShuffleServerMetrics.register(shuffleServerCollectorRegistry, tags);
     grpcMetrics = new ShuffleServerGrpcMetrics(tags);
     grpcMetrics.register(new CollectorRegistry(true));
+    nettyMetrics = new ShuffleServerNettyMetrics(tags);
+    nettyMetrics.register(new CollectorRegistry(true));
     CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
     boolean verbose =
         
shuffleServerConf.getBoolean(ShuffleServerConf.RSS_JVM_METRICS_VERBOSE_ENABLE);
@@ -465,6 +472,10 @@ public class ShuffleServer {
     return grpcMetrics;
   }
 
+  public NettyMetrics getNettyMetrics() {
+    return nettyMetrics;
+  }
+
   public boolean isDecommissioning() {
     return ServerStatus.DECOMMISSIONING.equals(serverStatus.get())
         || ServerStatus.DECOMMISSIONED.equals(serverStatus.get());
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
new file mode 100644
index 00000000..afd242dc
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import org.apache.uniffle.common.metrics.NettyMetrics;
+import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataRequest;
+import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexRequest;
+import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataRequest;
+import org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest;
+
+public class ShuffleServerNettyMetrics extends NettyMetrics {
+
+  private static final String _TRANSPORT_LATENCY = "_transport_latency";
+  private static final String _PROCESS_LATENCY = "_process_latency";
+  private static final String _TOTAL = "_total";
+  private static final String NETTY_SEND_SHUFFLE_DATA_REQUEST = 
"netty_send_shuffle_data_request";
+  private static final String NETTY_GET_SHUFFLE_DATA_REQUEST =
+      "netty_get_local_shuffle_data_request";
+  private static final String NETTY_GET_SHUFFLE_INDEX_REQUEST =
+      "netty_get_local_shuffle_index_request";
+  private static final String NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST =
+      "netty_get_memory_shuffle_data_request";
+
+  public ShuffleServerNettyMetrics(String tags) {
+    super(tags);
+  }
+
+  @Override
+  public void registerMetrics() {
+    gaugeMap.putIfAbsent(
+        SendShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledGauge(NETTY_SEND_SHUFFLE_DATA_REQUEST));
+    gaugeMap.putIfAbsent(
+        GetLocalShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledGauge(NETTY_GET_SHUFFLE_DATA_REQUEST));
+    gaugeMap.putIfAbsent(
+        GetLocalShuffleIndexRequest.class.getName(),
+        metricsManager.addLabeledGauge(NETTY_GET_SHUFFLE_INDEX_REQUEST));
+    gaugeMap.putIfAbsent(
+        GetMemoryShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledGauge(NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST));
+
+    counterMap.putIfAbsent(
+        SendShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledCounter(NETTY_SEND_SHUFFLE_DATA_REQUEST + 
_TOTAL));
+    counterMap.putIfAbsent(
+        GetLocalShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledCounter(NETTY_GET_SHUFFLE_DATA_REQUEST + 
_TOTAL));
+    counterMap.putIfAbsent(
+        GetLocalShuffleIndexRequest.class.getName(),
+        metricsManager.addLabeledCounter(NETTY_GET_SHUFFLE_INDEX_REQUEST + 
_TOTAL));
+    counterMap.putIfAbsent(
+        GetMemoryShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledCounter(NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST 
+ _TOTAL));
+
+    transportTimeSummaryMap.putIfAbsent(
+        SendShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledSummary(NETTY_SEND_SHUFFLE_DATA_REQUEST + 
_TRANSPORT_LATENCY));
+    transportTimeSummaryMap.putIfAbsent(
+        GetLocalShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledSummary(NETTY_GET_SHUFFLE_DATA_REQUEST + 
_TRANSPORT_LATENCY));
+    transportTimeSummaryMap.putIfAbsent(
+        GetLocalShuffleIndexRequest.class.getName(),
+        metricsManager.addLabeledSummary(NETTY_GET_SHUFFLE_INDEX_REQUEST + 
_TRANSPORT_LATENCY));
+    transportTimeSummaryMap.putIfAbsent(
+        GetMemoryShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledSummary(
+            NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST + _TRANSPORT_LATENCY));
+
+    processTimeSummaryMap.putIfAbsent(
+        SendShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledSummary(NETTY_SEND_SHUFFLE_DATA_REQUEST + 
_PROCESS_LATENCY));
+    processTimeSummaryMap.putIfAbsent(
+        GetLocalShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledSummary(NETTY_GET_SHUFFLE_DATA_REQUEST + 
_PROCESS_LATENCY));
+    processTimeSummaryMap.putIfAbsent(
+        GetLocalShuffleIndexRequest.class.getName(),
+        metricsManager.addLabeledSummary(NETTY_GET_SHUFFLE_INDEX_REQUEST + 
_PROCESS_LATENCY));
+    processTimeSummaryMap.putIfAbsent(
+        GetMemoryShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledSummary(NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST 
+ _PROCESS_LATENCY));
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 6abbd16e..c4fb409f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -52,7 +52,6 @@ import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.server.ShuffleDataReadEvent;
 import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.server.ShuffleServerGrpcMetrics;
 import org.apache.uniffle.server.ShuffleServerMetrics;
 import org.apache.uniffle.server.ShuffleTaskManager;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
@@ -72,6 +71,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
 
   @Override
   public void receive(TransportClient client, RequestMessage msg) {
+    shuffleServer.getNettyMetrics().incCounter(msg.getClass().getName());
     if (msg instanceof SendShuffleDataRequest) {
       handleSendShuffleDataRequest(client, (SendShuffleDataRequest) msg);
     } else if (msg instanceof GetLocalShuffleDataRequest) {
@@ -83,6 +83,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
     } else {
       throw new RssException("Can not handle message " + msg.type());
     }
+    shuffleServer.getNettyMetrics().decCounter(msg.getClass().getName());
   }
 
   @Override
@@ -107,8 +108,8 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       long transportTime = System.currentTimeMillis() - timestamp;
       if (transportTime > 0) {
         shuffleServer
-            .getGrpcMetrics()
-            
.recordTransportTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, 
transportTime);
+            .getNettyMetrics()
+            .recordTransportTime(SendShuffleDataRequest.class.getName(), 
transportTime);
       }
     }
     int requireSize = 
shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
@@ -190,8 +191,8 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       rpcResponse = new RpcResponse(req.getRequestId(), ret, responseMessage);
       long costTime = System.currentTimeMillis() - start;
       shuffleServer
-          .getGrpcMetrics()
-          
.recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, costTime);
+          .getNettyMetrics()
+          .recordProcessTime(SendShuffleDataRequest.class.getName(), costTime);
       LOG.debug(
           "Cache Shuffle Data for appId["
               + appId
@@ -225,9 +226,8 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       long transportTime = System.currentTimeMillis() - timestamp;
       if (transportTime > 0) {
         shuffleServer
-            .getGrpcMetrics()
-            .recordTransportTime(
-                ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD, 
transportTime);
+            .getNettyMetrics()
+            .recordTransportTime(GetMemoryShuffleDataRequest.class.getName(), 
transportTime);
       }
     }
     long start = System.currentTimeMillis();
@@ -260,8 +260,8 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
         }
         long costTime = System.currentTimeMillis() - start;
         shuffleServer
-            .getGrpcMetrics()
-            
.recordProcessTime(ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD, 
costTime);
+            .getNettyMetrics()
+            .recordProcessTime(GetMemoryShuffleDataRequest.class.getName(), 
costTime);
         LOG.info(
             "Successfully getInMemoryShuffleData cost {} ms with {} bytes 
shuffle" + " data for {}",
             costTime,
@@ -390,8 +390,8 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       long transportTime = System.currentTimeMillis() - timestamp;
       if (transportTime > 0) {
         shuffleServer
-            .getGrpcMetrics()
-            
.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, 
transportTime);
+            .getNettyMetrics()
+            .recordTransportTime(GetLocalShuffleDataRequest.class.getName(), 
transportTime);
       }
     }
     String storageType = 
shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE);
@@ -444,8 +444,8 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
         
ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getData().length);
         
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getData().length);
         shuffleServer
-            .getGrpcMetrics()
-            
.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, readTime);
+            .getNettyMetrics()
+            .recordProcessTime(GetLocalShuffleDataRequest.class.getName(), 
readTime);
         LOG.info(
             "Successfully getShuffleData cost {} ms for shuffle" + " data with 
{}",
             readTime,
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java 
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
index d4a6b53d..2c2e6892 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
@@ -93,6 +93,10 @@ public class StreamServer implements ServerInterface {
               @Override
               public void initChannel(final SocketChannel ch) {
                 transportContext.initializePipeline(ch, new 
TransportFrameDecoder());
+                ch.pipeline()
+                    .addLast(
+                        "metricHandler",
+                        new 
StreamServerMetricHandler(shuffleServer.getNettyMetrics()));
               }
             })
         .option(ChannelOption.SO_BACKLOG, backlogSize)
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServerMetricHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServerMetricHandler.java
new file mode 100644
index 00000000..3c8d9cf0
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServerMetricHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+
+import org.apache.uniffle.common.metrics.NettyMetrics;
+
[email protected]
+public class StreamServerMetricHandler extends ChannelDuplexHandler {
+
+  private final NettyMetrics nettyMetrics;
+
+  public StreamServerMetricHandler(NettyMetrics nettyMetrics) {
+    this.nettyMetrics = nettyMetrics;
+  }
+
+  @Override
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    nettyMetrics.getGaugeNettyActiveConn().inc();
+    super.channelActive(ctx);
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    nettyMetrics.getGaugeNettyActiveConn().dec();
+    super.channelInactive(ctx);
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+    nettyMetrics.getCounterNettyException().inc();
+    super.exceptionCaught(ctx, cause);
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 3479003c..d4be783f 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -47,6 +47,7 @@ public class ShuffleServerMetricsTest {
   private static final String SERVER_METRICS_URL = 
"http://127.0.0.1:12345/metrics/server";;
   private static final String SERVER_JVM_URL = 
"http://127.0.0.1:12345/metrics/jvm";;
   private static final String SERVER_GRPC_URL = 
"http://127.0.0.1:12345/metrics/grpc";;
+  private static final String SERVER_NETTY_URL = 
"http://127.0.0.1:12345/metrics/netty";;
   private static final String REMOTE_STORAGE_PATH = "hdfs://hdfs1:9000/rss";
   private static final String STORAGE_HOST = "hdfs1";
   private static ShuffleServer shuffleServer;
@@ -194,6 +195,15 @@ public class ShuffleServerMetricsTest {
     assertEquals(69, actualObj.get("metrics").size());
   }
 
+  @Test
+  public void testNettyMetrics() throws Exception {
+    String content = TestUtils.httpGet(SERVER_NETTY_URL);
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode actualObj = mapper.readTree(content);
+    assertEquals(2, actualObj.size());
+    assertEquals(66, actualObj.get("metrics").size());
+  }
+
   @Test
   public void testServerMetricsConcurrently() throws Exception {
     ExecutorService executorService = Executors.newFixedThreadPool(3);

Reply via email to