This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 403a7a956 [#1586] improvement(netty): Allow Netty Worker thread pool
size to dynamically adapt to the number of processor cores (#1587)
403a7a956 is described below
commit 403a7a956069a0b1965ec4b7914c4c3805d41190
Author: RickyMa <[email protected]>
AuthorDate: Tue Mar 19 15:23:43 2024 +0800
[#1586] improvement(netty): Allow Netty Worker thread pool size to
dynamically adapt to the number of processor cores (#1587)
### What changes were proposed in this pull request?
Allow Netty Worker thread pool size to dynamically adapt to the number of
processor cores.
We will have a more reasonable default value and better performance in most
cases.
### Why are the changes needed?
For https://github.com/apache/incubator-uniffle/issues/1586.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
docs/server_guide.md | 2 +-
.../java/org/apache/uniffle/server/ShuffleServerConf.java | 7 +++++--
.../java/org/apache/uniffle/server/netty/StreamServer.java | 12 ++++++++++++
3 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 9d6585987..25224d6d6 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -77,7 +77,7 @@ This document will introduce how to deploy Uniffle shuffle
servers.
| rss.server.netty.port | -1
| Netty port for Shuffle
server, if set zero, Netty server start on random port.
[...]
| rss.server.netty.epoll.enable | false
| If enable epoll model
with Netty server.
[...]
| rss.server.netty.accept.thread | 10
| Accept thread count in
netty.
[...]
-| rss.server.netty.worker.thread | 100
| Worker thread count in
netty.
[...]
+| rss.server.netty.worker.thread | 0
| Worker thread count in
netty. When set to 0, the default value is dynamically set to twice the number
of processor cores, but it will not be less than 100 to ensure the minimum
throughput of the service.
[...]
| rss.server.netty.connect.backlog | 0
| For Netty server,
requested maximum length of the queue of incoming connections.
[...]
| rss.server.netty.connect.timeout | 5000
| Timeout for connection in
netty.
[...]
| rss.server.netty.receive.buf | 0
| Receive buffer size
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth =
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system
automatically estimates the receive buffer size based on default settings.
[...]
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index a6cd26547..e72510eae 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -500,8 +500,11 @@ public class ShuffleServerConf extends RssBaseConf {
public static final ConfigOption<Integer> NETTY_SERVER_WORKER_THREAD =
ConfigOptions.key("rss.server.netty.worker.thread")
.intType()
- .defaultValue(100)
- .withDescription("Worker thread count in netty");
+ .defaultValue(0)
+ .withDescription(
+ "Worker thread count in netty. When set to 0, "
+ + "the default value is dynamically set to twice the number
of processor cores, "
+ + "but it will not be less than 100 to ensure the minimum
throughput of the service.");
public static final ConfigOption<Long> SERVER_NETTY_HANDLER_IDLE_TIMEOUT =
ConfigOptions.key("rss.server.netty.handler.idle.timeout")
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
index d7990a126..e1eabe060 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
@@ -30,6 +30,8 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.NettyRuntime;
+import io.netty.util.internal.SystemPropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +50,9 @@ public class StreamServer implements ServerInterface {
private static final Logger LOG =
LoggerFactory.getLogger(StreamServer.class);
+ /** Use this value to ensure the minimum throughput. */
+ private static final int MIN_NETTY_SERVER_WORKER_THREAD_COUNT_DEFAULT = 100;
+
private ShuffleServer shuffleServer;
private EventLoopGroup shuffleBossGroup;
private EventLoopGroup shuffleWorkerGroup;
@@ -61,6 +66,13 @@ public class StreamServer implements ServerInterface {
shuffleServerConf.getBoolean(ShuffleServerConf.NETTY_SERVER_EPOLL_ENABLE);
int acceptThreads =
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_ACCEPT_THREAD);
int workerThreads =
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_WORKER_THREAD);
+ if (workerThreads == 0) {
+ workerThreads =
+ Math.max(
+ MIN_NETTY_SERVER_WORKER_THREAD_COUNT_DEFAULT,
+ SystemPropertyUtil.getInt(
+ "io.netty.eventLoopThreads",
NettyRuntime.availableProcessors() * 2));
+ }
if (isEpollEnable) {
shuffleBossGroup = new EpollEventLoopGroup(acceptThreads);
shuffleWorkerGroup = new EpollEventLoopGroup(workerThreads);