This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7b631773e [#1686] feat(netty): Support pending tasks number metrics
for Netty EventLoopGroup (#1687)
7b631773e is described below
commit 7b631773e822773634c0c536f9aedc9676f4a3ea
Author: RickyMa <[email protected]>
AuthorDate: Sat May 11 11:14:18 2024 +0800
[#1686] feat(netty): Support pending tasks number metrics for Netty
EventLoopGroup (#1687)
### What changes were proposed in this pull request?
Support pending tasks number metrics for Netty EventLoopGroup.
### Why are the changes needed?
For https://github.com/apache/incubator-uniffle/issues/1686.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
---
.../uniffle/common/metrics/NettyMetrics.java | 18 +++++
docs/server_guide.md | 77 +++++++++++-----------
.../apache/uniffle/server/ShuffleServerConf.java | 7 ++
.../apache/uniffle/server/netty/StreamServer.java | 45 +++++++++++++
.../uniffle/server/ShuffleServerMetricsTest.java | 2 +-
5 files changed, 110 insertions(+), 39 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
index 0879e2075..b41f57690 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
@@ -26,9 +26,15 @@ public abstract class NettyMetrics extends RPCMetrics {
private static final String NETTY_ACTIVE_CONNECTION =
"netty_active_connection";
private static final String NETTY_HANDLE_EXCEPTION =
"netty_handle_exception";
+ private static final String NETTY_PENDING_TASKS_NUM_FOR_BOSS_GROUP =
+ "netty_pending_tasks_num_for_boss_group";
+ private static final String NETTY_PENDING_TASKS_NUM_FOR_WORKER_GROUP =
+ "netty_pending_tasks_num_for_worker_group";
protected Gauge.Child gaugeNettyActiveConn;
protected Counter.Child counterNettyException;
+ protected Gauge.Child gaugeNettyPendingTasksNumForBossGroup;
+ protected Gauge.Child gaugeNettyPendingTasksNumForWorkerGroup;
public NettyMetrics(RssConf rssConf, String tags) {
super(rssConf, tags);
@@ -38,6 +44,10 @@ public abstract class NettyMetrics extends RPCMetrics {
public void registerGeneralMetrics() {
gaugeNettyActiveConn =
metricsManager.addLabeledGauge(NETTY_ACTIVE_CONNECTION);
counterNettyException =
metricsManager.addLabeledCounter(NETTY_HANDLE_EXCEPTION);
+ gaugeNettyPendingTasksNumForBossGroup =
+ metricsManager.addLabeledGauge(NETTY_PENDING_TASKS_NUM_FOR_BOSS_GROUP);
+ gaugeNettyPendingTasksNumForWorkerGroup =
+
metricsManager.addLabeledGauge(NETTY_PENDING_TASKS_NUM_FOR_WORKER_GROUP);
}
public Counter.Child getCounterNettyException() {
@@ -47,4 +57,12 @@ public abstract class NettyMetrics extends RPCMetrics {
public Gauge.Child getGaugeNettyActiveConn() {
return gaugeNettyActiveConn;
}
+
+ public Gauge.Child getGaugeNettyPendingTasksNumForBossGroup() {
+ return gaugeNettyPendingTasksNumForBossGroup;
+ }
+
+ public Gauge.Child getGaugeNettyPendingTasksNumForWorkerGroup() {
+ return gaugeNettyPendingTasksNumForWorkerGroup;
+ }
}
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 8799fc4a2..ec2d24fcc 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -69,44 +69,45 @@ This document will introduce how to deploy Uniffle shuffle
servers.
```
## Configuration
-| Property Name | Default
| Description
[...]
-|---------------------------------------------------------|------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| rss.coordinator.quorum | -
| Coordinator quorum
[...]
-| rss.rpc.server.type | GRPC
| Shuffle server type,
supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using
GRPC_NETTY to enable Netty on the server side for better stability and
performance.
[...]
-| rss.rpc.server.port | 19999
| RPC port for Shuffle
server, if set zero, grpc server start on random port.
[...]
-| rss.jetty.http.port | 19998
| Http port for Shuffle
server
[...]
-| rss.server.netty.port | -1
| Netty port for Shuffle
server, if set zero, Netty server start on random port.
[...]
-| rss.server.netty.epoll.enable | false
| Whether to enable epoll
model with Netty server.
[...]
-| rss.server.netty.accept.thread | 10
| Accept thread count in
netty.
[...]
-| rss.server.netty.worker.thread | 0
| Worker thread count in
netty. When set to 0, the default value is dynamically set to twice the number
of processor cores, but it will not be less than 100 to ensure the minimum
throughput of the service.
[...]
-| rss.server.netty.connect.backlog | 0
| For Netty server,
requested maximum length of the queue of incoming connections.
[...]
-| rss.server.netty.connect.timeout | 5000
| Timeout for connection in
netty.
[...]
-| rss.server.netty.receive.buf | 0
| Receive buffer size
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth =
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system
automatically estimates the receive buffer size based on default settings.
[...]
-| rss.server.netty.send.buf | 0
| Send buffer size
(SO_SNDBUF).
[...]
-| rss.server.buffer.capacity | -1
| Max memory of buffer
manager for shuffle server. If negative, JVM heap size * buffer.ratio is used
[...]
-| rss.server.buffer.capacity.ratio | 0.6
| when
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or
off-heap size(when enabling Netty) * ratio
[...]
-| rss.server.memory.shuffle.highWaterMark.percentage | 75.0
| Threshold of spill data
to storage, percentage of rss.server.buffer.capacity
[...]
-| rss.server.memory.shuffle.lowWaterMark.percentage | 25.0
| Threshold of keep data in
memory, percentage of rss.server.buffer.capacity
[...]
-| rss.server.read.buffer.capacity | -1
| Max size of buffer for
reading data. If negative, JVM heap size * read.buffer.ratio is used
[...]
-| rss.server.read.buffer.capacity.ratio | 0.2
| when
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap
size or off-heap size(when enabling Netty) * ratio
[...]
-| rss.server.heartbeat.interval | 10000
| Heartbeat interval to
Coordinator (ms)
[...]
-| rss.server.flush.localfile.threadPool.size | 10
| Thread pool for flush
data to local file
[...]
-| rss.server.flush.hadoop.threadPool.size | 60
| Thread pool for flush
data to hadoop storage
[...]
-| rss.server.commit.timeout | 600000
| Timeout when commit
shuffle data (ms)
[...]
-| rss.storage.type | -
| Supports
MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS
[...]
-| rss.server.flush.cold.storage.threshold.size | 64M
| The threshold of data
size for LOACALFILE and HADOOP if MEMORY_LOCALFILE_HDFS is used
[...]
-| rss.server.tags | -
| The comma-separated list
of tags to indicate the shuffle server's attributes. It will be used as the
assignment basis for the coordinator
[...]
-| rss.server.single.buffer.flush.enabled | true
| Whether single buffer
flush when size exceeded rss.server.single.buffer.flush.threshold
[...]
-| rss.server.single.buffer.flush.threshold | 128M
| The threshold of single
shuffle buffer flush
[...]
-| rss.server.disk.capacity | -1
| Disk capacity that
shuffle server can use. If negative, it will use disk whole space * ratio
[...]
-| rss.server.disk.capacity.ratio | 0.9
| When
`rss.server.disk.capacity` is negative, disk whole space * ratio is used
[...]
-| rss.server.hybrid.storage.fallback.strategy.class | -
| The fallback strategy for
`MEMORY_LOCALFILE_HDFS`. Support
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
and `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy`.
If not set, `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackSt
[...]
-| rss.server.leak.shuffledata.check.interval | 3600000
| The interval of leak
shuffle data check (ms)
[...]
-| rss.server.max.concurrency.of.per-partition.write | 30
| The max concurrency of
single partition writer, the data partition file number is equal to this value.
Default value is 1. This config could improve the writing speed, especially for
huge partition.
[...]
-| rss.server.max.concurrency.limit.of.per-partition.write | -
| The limit for max
concurrency per-partition write specified by client, this won't be enabled by
default.
[...]
-| rss.metrics.reporter.class | -
| The class of metrics
reporter.
[...]
-| rss.server.hybrid.storage.manager.selector.class |
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is
`DefaultStorageManagerSelector`, and another
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's
data to cold storage.
[...]
-| rss.server.disk-capacity.watermark.check.enabled | false
| If it is co-located with
other services, the high-low watermark check based on the uniffle used is not
correct. Due to this, the whole disk capacity watermark check is necessary,
which will reuse the current watermark value. It will be disabled by default.
[...]
+| Property Name | Default
| Description
[...]
+|----------------------------------------------------------|------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| rss.coordinator.quorum | -
| Coordinator quorum
[...]
+| rss.rpc.server.type | GRPC
| Shuffle server type,
supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using
GRPC_NETTY to enable Netty on the server side for better stability and
performance.
[...]
+| rss.rpc.server.port | 19999
| RPC port for Shuffle
server, if set zero, grpc server start on random port.
[...]
+| rss.jetty.http.port | 19998
| Http port for Shuffle
server
[...]
+| rss.server.netty.port | -1
| Netty port for Shuffle
server, if set zero, Netty server start on random port.
[...]
+| rss.server.netty.epoll.enable | false
| Whether to enable epoll
model with Netty server.
[...]
+| rss.server.netty.accept.thread | 10
| Accept thread count in
netty.
[...]
+| rss.server.netty.worker.thread | 0
| Worker thread count in
netty. When set to 0, the default value is dynamically set to twice the number
of processor cores, but it will not be less than 100 to ensure the minimum
throughput of the service.
[...]
+| rss.server.netty.connect.backlog | 0
| For Netty server,
requested maximum length of the queue of incoming connections.
[...]
+| rss.server.netty.connect.timeout | 5000
| Timeout for connection
in netty.
[...]
+| rss.server.netty.receive.buf | 0
| Receive buffer size
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth =
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system
automatically estimates the receive buffer size based on default settings.
[...]
+| rss.server.netty.send.buf | 0
| Send buffer size
(SO_SNDBUF).
[...]
+| rss.server.buffer.capacity | -1
| Max memory of buffer
manager for shuffle server. If negative, JVM heap size * buffer.ratio is used
[...]
+| rss.server.buffer.capacity.ratio | 0.6
| when
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or
off-heap size(when enabling Netty) * ratio
[...]
+| rss.server.memory.shuffle.highWaterMark.percentage | 75.0
| Threshold of spill data
to storage, percentage of rss.server.buffer.capacity
[...]
+| rss.server.memory.shuffle.lowWaterMark.percentage | 25.0
| Threshold of keep data
in memory, percentage of rss.server.buffer.capacity
[...]
+| rss.server.read.buffer.capacity | -1
| Max size of buffer for
reading data. If negative, JVM heap size * read.buffer.ratio is used
[...]
+| rss.server.read.buffer.capacity.ratio | 0.2
| when
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap
size or off-heap size(when enabling Netty) * ratio
[...]
+| rss.server.heartbeat.interval | 10000
| Heartbeat interval to
Coordinator (ms)
[...]
+| rss.server.netty.metrics.pendingTaskNumPollingIntervalMs | 10000
| How often to collect
Netty pending tasks number metrics (in milliseconds)
[...]
+| rss.server.flush.localfile.threadPool.size | 10
| Thread pool for flush
data to local file
[...]
+| rss.server.flush.hadoop.threadPool.size | 60
| Thread pool for flush
data to hadoop storage
[...]
+| rss.server.commit.timeout | 600000
| Timeout when commit
shuffle data (ms)
[...]
+| rss.storage.type | -
| Supports
MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS
[...]
+| rss.server.flush.cold.storage.threshold.size | 64M
| The threshold of data
size for LOACALFILE and HADOOP if MEMORY_LOCALFILE_HDFS is used
[...]
+| rss.server.tags | -
| The comma-separated list
of tags to indicate the shuffle server's attributes. It will be used as the
assignment basis for the coordinator
[...]
+| rss.server.single.buffer.flush.enabled | true
| Whether single buffer
flush when size exceeded rss.server.single.buffer.flush.threshold
[...]
+| rss.server.single.buffer.flush.threshold | 128M
| The threshold of single
shuffle buffer flush
[...]
+| rss.server.disk.capacity | -1
| Disk capacity that
shuffle server can use. If negative, it will use disk whole space * ratio
[...]
+| rss.server.disk.capacity.ratio | 0.9
| When
`rss.server.disk.capacity` is negative, disk whole space * ratio is used
[...]
+| rss.server.hybrid.storage.fallback.strategy.class | -
| The fallback strategy
for `MEMORY_LOCALFILE_HDFS`. Support
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
and `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy`.
If not set, `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackS
[...]
+| rss.server.leak.shuffledata.check.interval | 3600000
| The interval of leak
shuffle data check (ms)
[...]
+| rss.server.max.concurrency.of.per-partition.write | 30
| The max concurrency of
single partition writer, the data partition file number is equal to this value.
Default value is 1. This config could improve the writing speed, especially for
huge partition.
[...]
+| rss.server.max.concurrency.limit.of.per-partition.write | -
| The limit for max
concurrency per-partition write specified by client, this won't be enabled by
default.
[...]
+| rss.metrics.reporter.class | -
| The class of metrics
reporter.
[...]
+| rss.server.hybrid.storage.manager.selector.class |
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is
`DefaultStorageManagerSelector`, and another
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's
data to cold storage.
[...]
+| rss.server.disk-capacity.watermark.check.enabled | false
| If it is co-located with
other services, the high-low watermark check based on the uniffle used is not
correct. Due to this, the whole disk capacity watermark check is necessary,
which will reuse the current watermark value. It will be disabled by default.
[...]
### Advanced Configurations
| Property Name | Default | Description
|
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 088252779..fe3979012 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -89,6 +89,13 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(10 * 1000L)
.withDescription("Direct memory usage tracker interval to
MetricSystem (ms)");
+ public static final ConfigOption<Long>
SERVER_NETTY_PENDING_TASKS_NUM_TRACKER_INTERVAL =
+
ConfigOptions.key("rss.server.netty.metrics.pendingTaskNumPollingIntervalMs")
+ .longType()
+ .defaultValue(10 * 1000L)
+ .withDescription(
+ "How often to collect Netty pending tasks number metrics (in
milliseconds)");
+
public static final ConfigOption<Integer>
SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE =
ConfigOptions.key("rss.server.flush.localfile.threadPool.size")
.intType()
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
index e1eabe060..f8410fcc9 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
@@ -18,7 +18,10 @@
package org.apache.uniffle.server.netty;
import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
@@ -31,6 +34,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.NettyRuntime;
+import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.SystemPropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +47,7 @@ import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.common.util.NettyUtils;
import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -59,6 +64,12 @@ public class StreamServer implements ServerInterface {
private ShuffleServerConf shuffleServerConf;
private ChannelFuture channelFuture;
+ private final ScheduledExecutorService nettyPendingTasksNumTracker =
+ Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.getThreadFactory("NettyPendingTasksNumTracker"));
+ /** Interval to poll for Netty pending tasks number for Netty metrics, in
milliseconds */
+ private final long pendingTasksNumMetricsPollingInterval;
+
public StreamServer(ShuffleServer shuffleServer) {
this.shuffleServer = shuffleServer;
this.shuffleServerConf = shuffleServer.getShuffleServerConf();
@@ -80,6 +91,37 @@ public class StreamServer implements ServerInterface {
shuffleBossGroup = new NioEventLoopGroup(acceptThreads);
shuffleWorkerGroup = new NioEventLoopGroup(workerThreads);
}
+ this.pendingTasksNumMetricsPollingInterval =
+ shuffleServerConf.getLong(
+ ShuffleServerConf.SERVER_NETTY_PENDING_TASKS_NUM_TRACKER_INTERVAL);
+ startMonitoringPendingTasks();
+ }
+
+ private void startMonitoringPendingTasks() {
+ nettyPendingTasksNumTracker.scheduleAtFixedRate(
+ () -> {
+ int pendingTasksNumForBossGroup =
getPendingTasksForEventLoopGroup(shuffleBossGroup);
+ shuffleServer
+ .getNettyMetrics()
+ .getGaugeNettyPendingTasksNumForBossGroup()
+ .set(pendingTasksNumForBossGroup);
+
+ int pendingTasksNumForWorkerGroup =
getPendingTasksForEventLoopGroup(shuffleWorkerGroup);
+ shuffleServer
+ .getNettyMetrics()
+ .getGaugeNettyPendingTasksNumForWorkerGroup()
+ .set(pendingTasksNumForWorkerGroup);
+ },
+ 0L,
+ pendingTasksNumMetricsPollingInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private int getPendingTasksForEventLoopGroup(EventLoopGroup eventLoopGroup) {
+ return StreamSupport.stream(eventLoopGroup.spliterator(), false)
+ .filter(eventExecutor -> eventExecutor instanceof
SingleThreadEventExecutor)
+ .mapToInt(eventExecutor -> ((SingleThreadEventExecutor)
eventExecutor).pendingTasks())
+ .sum();
}
private ServerBootstrap bootstrapChannel(
@@ -177,6 +219,9 @@ public class StreamServer implements ServerInterface {
shuffleBossGroup = null;
shuffleWorkerGroup = null;
}
+ if (!nettyPendingTasksNumTracker.isShutdown()) {
+ nettyPendingTasksNumTracker.shutdown();
+ }
}
@Override
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index a5824a5b0..c4462f1c2 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -239,7 +239,7 @@ public class ShuffleServerMetricsTest {
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
- assertEquals(66, actualObj.get("metrics").size());
+ assertEquals(68, actualObj.get("metrics").size());
}
@Test
