This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new a5ba479f [#640] feat(netty): Metric system for netty server (#1041)
a5ba479f is described below
commit a5ba479f3967a4732616a0b352b1882f1dc85a87
Author: xumanbu <[email protected]>
AuthorDate: Fri Jul 28 10:39:04 2023 +0800
[#640] feat(netty): Metric system for netty server (#1041)
### What changes were proposed in this pull request?
add NettyMetrics for StreamSever
### Why are the changes needed?
Fix: #640
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
Co-authored-by: jam.xu <[email protected]>
---
.../apache/uniffle/common/metrics/GRPCMetrics.java | 114 ++-------------------
.../uniffle/common/metrics/NettyMetrics.java | 48 +++++++++
.../metrics/{GRPCMetrics.java => RPCMetrics.java} | 63 ++----------
.../org/apache/uniffle/server/ShuffleServer.java | 11 ++
.../uniffle/server/ShuffleServerNettyMetrics.java | 98 ++++++++++++++++++
.../server/netty/ShuffleServerNettyHandler.java | 28 ++---
.../apache/uniffle/server/netty/StreamServer.java | 4 +
.../server/netty/StreamServerMetricHandler.java | 52 ++++++++++
.../uniffle/server/ShuffleServerMetricsTest.java | 10 ++
9 files changed, 253 insertions(+), 175 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index 6a066e15..99786c1e 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -17,18 +17,12 @@
package org.apache.uniffle.common.metrics;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
-import io.prometheus.client.Summary;
import org.apache.uniffle.common.util.Constants;
-import org.apache.uniffle.common.util.JavaUtils;
-public abstract class GRPCMetrics {
+public abstract class GRPCMetrics extends RPCMetrics {
// Grpc server internal executor metrics
public static final String GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY =
"grpcServerExecutorActiveThreads";
@@ -43,34 +37,17 @@ public abstract class GRPCMetrics {
private static final String GRPC_OPEN = "grpc_open";
private static final String GRPC_TOTAL = "grpc_total";
- private boolean isRegistered = false;
- protected Map<String, Counter.Child> counterMap =
JavaUtils.newConcurrentMap();
- protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
- protected Map<String, Summary.Child> transportTimeSummaryMap =
JavaUtils.newConcurrentMap();
- protected Map<String, Summary.Child> processTimeSummaryMap =
JavaUtils.newConcurrentMap();
protected Gauge.Child gaugeGrpcOpen;
protected Counter.Child counterGrpcTotal;
- protected MetricsManager metricsManager;
- protected String tags;
public GRPCMetrics(String tags) {
- this.tags = tags;
+ super(tags);
}
public abstract void registerMetrics();
- public void register(CollectorRegistry collectorRegistry) {
- if (!isRegistered) {
- Map<String, String> labels = Maps.newHashMap();
- labels.put(Constants.METRICS_TAG_LABEL_NAME, tags);
- metricsManager = new MetricsManager(collectorRegistry, labels);
- registerGeneralMetrics();
- registerMetrics();
- isRegistered = true;
- }
- }
-
- private void registerGeneralMetrics() {
+ @Override
+ public void registerGeneralMetrics() {
gaugeGrpcOpen = metricsManager.addLabeledGauge(GRPC_OPEN);
counterGrpcTotal = metricsManager.addLabeledCounter(GRPC_TOTAL);
gaugeMap.putIfAbsent(
@@ -84,51 +61,9 @@ public abstract class GRPCMetrics {
metricsManager.addLabeledGauge(GRPC_SERVER_CONNECTION_NUMBER));
}
- public void setGauge(String tag, double value) {
- if (isRegistered) {
- Gauge.Child gauge = gaugeMap.get(tag);
- if (gauge != null) {
- gauge.set(value);
- }
- }
- }
-
- public void incGauge(String tag) {
- incGauge(tag, 1);
- }
-
- public void incGauge(String tag, double value) {
- if (isRegistered) {
- Gauge.Child gauge = gaugeMap.get(tag);
- if (gauge != null) {
- gauge.inc(value);
- }
- }
- }
-
- public void decGauge(String tag) {
- decGauge(tag, 1);
- }
-
- public void decGauge(String tag, double value) {
- if (isRegistered) {
- Gauge.Child gauge = gaugeMap.get(tag);
- if (gauge != null) {
- gauge.dec(value);
- }
- }
- }
-
public void incCounter(String methodName) {
if (isRegistered) {
- Gauge.Child gauge = gaugeMap.get(methodName);
- if (gauge != null) {
- gauge.inc();
- }
- Counter.Child counter = counterMap.get(methodName);
- if (counter != null) {
- counter.inc();
- }
+ super.incCounter(methodName);
gaugeGrpcOpen.inc();
counterGrpcTotal.inc();
}
@@ -136,40 +71,11 @@ public abstract class GRPCMetrics {
public void decCounter(String methodName) {
if (isRegistered) {
- Gauge.Child gauge = gaugeMap.get(methodName);
- if (gauge != null) {
- gauge.dec();
- }
+ super.decCounter(methodName);
gaugeGrpcOpen.dec();
}
}
- public void recordTransportTime(String methodName, long
transportTimeInMillionSecond) {
- Summary.Child summary = transportTimeSummaryMap.get(methodName);
- if (summary != null) {
- summary.observe(transportTimeInMillionSecond /
Constants.MILLION_SECONDS_PER_SECOND);
- }
- }
-
- public void recordProcessTime(String methodName, long
processTimeInMillionSecond) {
- Summary.Child summary = processTimeSummaryMap.get(methodName);
- if (summary != null) {
- summary.observe(processTimeInMillionSecond /
Constants.MILLION_SECONDS_PER_SECOND);
- }
- }
-
- public CollectorRegistry getCollectorRegistry() {
- return metricsManager.getCollectorRegistry();
- }
-
- public Map<String, Counter.Child> getCounterMap() {
- return counterMap;
- }
-
- public Map<String, Gauge.Child> getGaugeMap() {
- return gaugeMap;
- }
-
public Gauge.Child getGaugeGrpcOpen() {
return gaugeGrpcOpen;
}
@@ -178,14 +84,6 @@ public abstract class GRPCMetrics {
return counterGrpcTotal;
}
- public Map<String, Summary.Child> getTransportTimeSummaryMap() {
- return transportTimeSummaryMap;
- }
-
- public Map<String, Summary.Child> getProcessTimeSummaryMap() {
- return processTimeSummaryMap;
- }
-
public static GRPCMetrics getEmptyGRPCMetrics() {
return new EmptyGRPCMetrics(Constants.SHUFFLE_SERVER_VERSION);
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
new file mode 100644
index 00000000..6ba4fcc0
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+
+public abstract class NettyMetrics extends RPCMetrics {
+
+ private static final String NETTY_ACTIVE_CONNECTION =
"netty_active_connection";
+ private static final String NETTY_HANDLE_EXCEPTION =
"netty_handle_exception";
+
+ protected Gauge.Child gaugeNettyActiveConn;
+ protected Counter.Child counterNettyException;
+
+ public NettyMetrics(String tags) {
+ super(tags);
+ }
+
+ @Override
+ public void registerGeneralMetrics() {
+ gaugeNettyActiveConn =
metricsManager.addLabeledGauge(NETTY_ACTIVE_CONNECTION);
+ counterNettyException =
metricsManager.addLabeledCounter(NETTY_HANDLE_EXCEPTION);
+ }
+
+ public Counter.Child getCounterNettyException() {
+ return counterNettyException;
+ }
+
+ public Gauge.Child getGaugeNettyActiveConn() {
+ return gaugeNettyActiveConn;
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
similarity index 63%
copy from
common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
copy to common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
index 6a066e15..819ebdee 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
@@ -28,37 +28,23 @@ import io.prometheus.client.Summary;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
-public abstract class GRPCMetrics {
- // Grpc server internal executor metrics
- public static final String GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY =
- "grpcServerExecutorActiveThreads";
- private static final String GRPC_SERVER_EXECUTOR_ACTIVE_THREADS =
- "grpc_server_executor_active_threads";
- public static final String GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY =
- "grpcServerExecutorBlockingQueueSize";
- private static final String GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE =
- "grpc_server_executor_blocking_queue_size";
- public static final String GRPC_SERVER_CONNECTION_NUMBER_KEY =
"grpcServerConnectionNumber";
- private static final String GRPC_SERVER_CONNECTION_NUMBER =
"grpc_server_connection_number";
- private static final String GRPC_OPEN = "grpc_open";
- private static final String GRPC_TOTAL = "grpc_total";
-
- private boolean isRegistered = false;
+public abstract class RPCMetrics {
+ protected boolean isRegistered = false;
protected Map<String, Counter.Child> counterMap =
JavaUtils.newConcurrentMap();
protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> transportTimeSummaryMap =
JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> processTimeSummaryMap =
JavaUtils.newConcurrentMap();
- protected Gauge.Child gaugeGrpcOpen;
- protected Counter.Child counterGrpcTotal;
protected MetricsManager metricsManager;
protected String tags;
- public GRPCMetrics(String tags) {
+ public RPCMetrics(String tags) {
this.tags = tags;
}
public abstract void registerMetrics();
+ public abstract void registerGeneralMetrics();
+
public void register(CollectorRegistry collectorRegistry) {
if (!isRegistered) {
Map<String, String> labels = Maps.newHashMap();
@@ -70,20 +56,6 @@ public abstract class GRPCMetrics {
}
}
- private void registerGeneralMetrics() {
- gaugeGrpcOpen = metricsManager.addLabeledGauge(GRPC_OPEN);
- counterGrpcTotal = metricsManager.addLabeledCounter(GRPC_TOTAL);
- gaugeMap.putIfAbsent(
- GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY,
- metricsManager.addLabeledGauge(GRPC_SERVER_EXECUTOR_ACTIVE_THREADS));
- gaugeMap.putIfAbsent(
- GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY,
-
metricsManager.addLabeledGauge(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE));
- gaugeMap.putIfAbsent(
- GRPC_SERVER_CONNECTION_NUMBER_KEY,
- metricsManager.addLabeledGauge(GRPC_SERVER_CONNECTION_NUMBER));
- }
-
public void setGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
@@ -119,28 +91,25 @@ public abstract class GRPCMetrics {
}
}
- public void incCounter(String methodName) {
+ public void incCounter(String metricKey) {
if (isRegistered) {
- Gauge.Child gauge = gaugeMap.get(methodName);
+ Gauge.Child gauge = gaugeMap.get(metricKey);
if (gauge != null) {
gauge.inc();
}
- Counter.Child counter = counterMap.get(methodName);
+ Counter.Child counter = counterMap.get(metricKey);
if (counter != null) {
counter.inc();
}
- gaugeGrpcOpen.inc();
- counterGrpcTotal.inc();
}
}
- public void decCounter(String methodName) {
+ public void decCounter(String metricKey) {
if (isRegistered) {
- Gauge.Child gauge = gaugeMap.get(methodName);
+ Gauge.Child gauge = gaugeMap.get(metricKey);
if (gauge != null) {
gauge.dec();
}
- gaugeGrpcOpen.dec();
}
}
@@ -170,14 +139,6 @@ public abstract class GRPCMetrics {
return gaugeMap;
}
- public Gauge.Child getGaugeGrpcOpen() {
- return gaugeGrpcOpen;
- }
-
- public Counter.Child getCounterGrpcTotal() {
- return counterGrpcTotal;
- }
-
public Map<String, Summary.Child> getTransportTimeSummaryMap() {
return transportTimeSummaryMap;
}
@@ -185,8 +146,4 @@ public abstract class GRPCMetrics {
public Map<String, Summary.Child> getProcessTimeSummaryMap() {
return processTimeSummaryMap;
}
-
- public static GRPCMetrics getEmptyGRPCMetrics() {
- return new EmptyGRPCMetrics(Constants.SHUFFLE_SERVER_VERSION);
- }
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 6770c3d9..3e953d07 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -42,6 +42,7 @@ import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.metrics.JvmMetrics;
import org.apache.uniffle.common.metrics.MetricReporter;
import org.apache.uniffle.common.metrics.MetricReporterFactory;
+import org.apache.uniffle.common.metrics.NettyMetrics;
import org.apache.uniffle.common.rpc.ServerInterface;
import org.apache.uniffle.common.security.SecurityConfig;
import org.apache.uniffle.common.security.SecurityContextFactory;
@@ -86,6 +87,7 @@ public class ShuffleServer {
private HealthCheck healthCheck;
private Set<String> tags = Sets.newHashSet();
private GRPCMetrics grpcMetrics;
+ private NettyMetrics nettyMetrics;
private MetricReporter metricReporter;
private AtomicReference<ServerStatus> serverStatus = new
AtomicReference(ServerStatus.ACTIVE);
@@ -214,6 +216,8 @@ public class ShuffleServer {
ShuffleServerMetrics.getCollectorRegistry());
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#grpc",
grpcMetrics.getCollectorRegistry());
+ jettyServer.registerInstance(
+ CollectorRegistry.class.getCanonicalName() + "#netty",
nettyMetrics.getCollectorRegistry());
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#jvm",
JvmMetrics.getCollectorRegistry());
jettyServer.registerInstance(
@@ -221,6 +225,7 @@ public class ShuffleServer {
new CoalescedCollectorRegistry(
ShuffleServerMetrics.getCollectorRegistry(),
grpcMetrics.getCollectorRegistry(),
+ nettyMetrics.getCollectorRegistry(),
JvmMetrics.getCollectorRegistry()));
SecurityConfig securityConfig = null;
@@ -290,6 +295,8 @@ public class ShuffleServer {
ShuffleServerMetrics.register(shuffleServerCollectorRegistry, tags);
grpcMetrics = new ShuffleServerGrpcMetrics(tags);
grpcMetrics.register(new CollectorRegistry(true));
+ nettyMetrics = new ShuffleServerNettyMetrics(tags);
+ nettyMetrics.register(new CollectorRegistry(true));
CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
boolean verbose =
shuffleServerConf.getBoolean(ShuffleServerConf.RSS_JVM_METRICS_VERBOSE_ENABLE);
@@ -465,6 +472,10 @@ public class ShuffleServer {
return grpcMetrics;
}
+ public NettyMetrics getNettyMetrics() {
+ return nettyMetrics;
+ }
+
public boolean isDecommissioning() {
return ServerStatus.DECOMMISSIONING.equals(serverStatus.get())
|| ServerStatus.DECOMMISSIONED.equals(serverStatus.get());
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
new file mode 100644
index 00000000..afd242dc
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import org.apache.uniffle.common.metrics.NettyMetrics;
+import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataRequest;
+import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexRequest;
+import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataRequest;
+import org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest;
+
+public class ShuffleServerNettyMetrics extends NettyMetrics {
+
+ private static final String _TRANSPORT_LATENCY = "_transport_latency";
+ private static final String _PROCESS_LATENCY = "_process_latency";
+ private static final String _TOTAL = "_total";
+ private static final String NETTY_SEND_SHUFFLE_DATA_REQUEST =
"netty_send_shuffle_data_request";
+ private static final String NETTY_GET_SHUFFLE_DATA_REQUEST =
+ "netty_get_local_shuffle_data_request";
+ private static final String NETTY_GET_SHUFFLE_INDEX_REQUEST =
+ "netty_get_local_shuffle_index_request";
+ private static final String NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST =
+ "netty_get_memory_shuffle_data_request";
+
+ public ShuffleServerNettyMetrics(String tags) {
+ super(tags);
+ }
+
+ @Override
+ public void registerMetrics() {
+ gaugeMap.putIfAbsent(
+ SendShuffleDataRequest.class.getName(),
+ metricsManager.addLabeledGauge(NETTY_SEND_SHUFFLE_DATA_REQUEST));
+ gaugeMap.putIfAbsent(
+ GetLocalShuffleDataRequest.class.getName(),
+ metricsManager.addLabeledGauge(NETTY_GET_SHUFFLE_DATA_REQUEST));
+ gaugeMap.putIfAbsent(
+ GetLocalShuffleIndexRequest.class.getName(),
+ metricsManager.addLabeledGauge(NETTY_GET_SHUFFLE_INDEX_REQUEST));
+ gaugeMap.putIfAbsent(
+ GetMemoryShuffleDataRequest.class.getName(),
+ metricsManager.addLabeledGauge(NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST));
+
+ counterMap.putIfAbsent(
+ SendShuffleDataRequest.class.getName(),
+ metricsManager.addLabeledCounter(NETTY_SEND_SHUFFLE_DATA_REQUEST +
_TOTAL));
+ counterMap.putIfAbsent(
+ GetLocalShuffleDataRequest.class.getName(),
+ metricsManager.addLabeledCounter(NETTY_GET_SHUFFLE_DATA_REQUEST +
_TOTAL));
+ counterMap.putIfAbsent(
+ GetLocalShuffleIndexRequest.class.getName(),
+ metricsManager.addLabeledCounter(NETTY_GET_SHUFFLE_INDEX_REQUEST +
_TOTAL));
+ counterMap.putIfAbsent(
+ GetMemoryShuffleDataRequest.class.getName(),
+ metricsManager.addLabeledCounter(NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST
+ _TOTAL));
+
+ transportTimeSummaryMap.putIfAbsent(
+ SendShuffleDataRequest.class.getName(),
+ metricsManager.addLabeledSummary(NETTY_SEND_SHUFFLE_DATA_REQUEST +
_TRANSPORT_LATENCY));
+ transportTimeSummaryMap.putIfAbsent(
+ GetLocalShuffleDataRequest.class.getName(),
+ metricsManager.addLabeledSummary(NETTY_GET_SHUFFLE_DATA_REQUEST +
_TRANSPORT_LATENCY));
+ transportTimeSummaryMap.putIfAbsent(
+ GetLocalShuffleIndexRequest.class.getName(),
+ metricsManager.addLabeledSummary(NETTY_GET_SHUFFLE_INDEX_REQUEST +
_TRANSPORT_LATENCY));
+ transportTimeSummaryMap.putIfAbsent(
+ GetMemoryShuffleDataRequest.class.getName(),
+ metricsManager.addLabeledSummary(
+ NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST + _TRANSPORT_LATENCY));
+
+ processTimeSummaryMap.putIfAbsent(
+ SendShuffleDataRequest.class.getName(),
+ metricsManager.addLabeledSummary(NETTY_SEND_SHUFFLE_DATA_REQUEST +
_PROCESS_LATENCY));
+ processTimeSummaryMap.putIfAbsent(
+ GetLocalShuffleDataRequest.class.getName(),
+ metricsManager.addLabeledSummary(NETTY_GET_SHUFFLE_DATA_REQUEST +
_PROCESS_LATENCY));
+ processTimeSummaryMap.putIfAbsent(
+ GetLocalShuffleIndexRequest.class.getName(),
+ metricsManager.addLabeledSummary(NETTY_GET_SHUFFLE_INDEX_REQUEST +
_PROCESS_LATENCY));
+ processTimeSummaryMap.putIfAbsent(
+ GetMemoryShuffleDataRequest.class.getName(),
+ metricsManager.addLabeledSummary(NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST
+ _PROCESS_LATENCY));
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 6abbd16e..c4fb409f 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -52,7 +52,6 @@ import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.server.ShuffleDataReadEvent;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.server.ShuffleServerGrpcMetrics;
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskManager;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
@@ -72,6 +71,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
@Override
public void receive(TransportClient client, RequestMessage msg) {
+ shuffleServer.getNettyMetrics().incCounter(msg.getClass().getName());
if (msg instanceof SendShuffleDataRequest) {
handleSendShuffleDataRequest(client, (SendShuffleDataRequest) msg);
} else if (msg instanceof GetLocalShuffleDataRequest) {
@@ -83,6 +83,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
} else {
throw new RssException("Can not handle message " + msg.type());
}
+ shuffleServer.getNettyMetrics().decCounter(msg.getClass().getName());
}
@Override
@@ -107,8 +108,8 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
- .getGrpcMetrics()
-
.recordTransportTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD,
transportTime);
+ .getNettyMetrics()
+ .recordTransportTime(SendShuffleDataRequest.class.getName(),
transportTime);
}
}
int requireSize =
shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
@@ -190,8 +191,8 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
rpcResponse = new RpcResponse(req.getRequestId(), ret, responseMessage);
long costTime = System.currentTimeMillis() - start;
shuffleServer
- .getGrpcMetrics()
-
.recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, costTime);
+ .getNettyMetrics()
+ .recordProcessTime(SendShuffleDataRequest.class.getName(), costTime);
LOG.debug(
"Cache Shuffle Data for appId["
+ appId
@@ -225,9 +226,8 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
- .getGrpcMetrics()
- .recordTransportTime(
- ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD,
transportTime);
+ .getNettyMetrics()
+ .recordTransportTime(GetMemoryShuffleDataRequest.class.getName(),
transportTime);
}
}
long start = System.currentTimeMillis();
@@ -260,8 +260,8 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
}
long costTime = System.currentTimeMillis() - start;
shuffleServer
- .getGrpcMetrics()
-
.recordProcessTime(ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD,
costTime);
+ .getNettyMetrics()
+ .recordProcessTime(GetMemoryShuffleDataRequest.class.getName(),
costTime);
LOG.info(
"Successfully getInMemoryShuffleData cost {} ms with {} bytes
shuffle" + " data for {}",
costTime,
@@ -390,8 +390,8 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
- .getGrpcMetrics()
-
.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD,
transportTime);
+ .getNettyMetrics()
+ .recordTransportTime(GetLocalShuffleDataRequest.class.getName(),
transportTime);
}
}
String storageType =
shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE);
@@ -444,8 +444,8 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getData().length);
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getData().length);
shuffleServer
- .getGrpcMetrics()
-
.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, readTime);
+ .getNettyMetrics()
+ .recordProcessTime(GetLocalShuffleDataRequest.class.getName(),
readTime);
LOG.info(
"Successfully getShuffleData cost {} ms for shuffle" + " data with
{}",
readTime,
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
index d4a6b53d..2c2e6892 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
@@ -93,6 +93,10 @@ public class StreamServer implements ServerInterface {
@Override
public void initChannel(final SocketChannel ch) {
transportContext.initializePipeline(ch, new
TransportFrameDecoder());
+ ch.pipeline()
+ .addLast(
+ "metricHandler",
+ new
StreamServerMetricHandler(shuffleServer.getNettyMetrics()));
}
})
.option(ChannelOption.SO_BACKLOG, backlogSize)
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServerMetricHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServerMetricHandler.java
new file mode 100644
index 00000000..3c8d9cf0
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServerMetricHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+
+import org.apache.uniffle.common.metrics.NettyMetrics;
+
[email protected]
+public class StreamServerMetricHandler extends ChannelDuplexHandler {
+
+ private final NettyMetrics nettyMetrics;
+
+ public StreamServerMetricHandler(NettyMetrics nettyMetrics) {
+ this.nettyMetrics = nettyMetrics;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ nettyMetrics.getGaugeNettyActiveConn().inc();
+ super.channelActive(ctx);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ nettyMetrics.getGaugeNettyActiveConn().dec();
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
+ nettyMetrics.getCounterNettyException().inc();
+ super.exceptionCaught(ctx, cause);
+ }
+}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 3479003c..d4be783f 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -47,6 +47,7 @@ public class ShuffleServerMetricsTest {
private static final String SERVER_METRICS_URL =
"http://127.0.0.1:12345/metrics/server";
private static final String SERVER_JVM_URL =
"http://127.0.0.1:12345/metrics/jvm";
private static final String SERVER_GRPC_URL =
"http://127.0.0.1:12345/metrics/grpc";
+ private static final String SERVER_NETTY_URL =
"http://127.0.0.1:12345/metrics/netty";
private static final String REMOTE_STORAGE_PATH = "hdfs://hdfs1:9000/rss";
private static final String STORAGE_HOST = "hdfs1";
private static ShuffleServer shuffleServer;
@@ -194,6 +195,15 @@ public class ShuffleServerMetricsTest {
assertEquals(69, actualObj.get("metrics").size());
}
+ @Test
+ public void testNettyMetrics() throws Exception {
+ String content = TestUtils.httpGet(SERVER_NETTY_URL);
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode actualObj = mapper.readTree(content);
+ assertEquals(2, actualObj.size());
+ assertEquals(66, actualObj.get("metrics").size());
+ }
+
@Test
public void testServerMetricsConcurrently() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);