This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 403a7a956 [#1586] improvement(netty): Allow Netty Worker thread pool 
size to dynamically adapt to the number of processor cores (#1587)
403a7a956 is described below

commit 403a7a956069a0b1965ec4b7914c4c3805d41190
Author: RickyMa <[email protected]>
AuthorDate: Tue Mar 19 15:23:43 2024 +0800

    [#1586] improvement(netty): Allow Netty Worker thread pool size to 
dynamically adapt to the number of processor cores (#1587)
    
    ### What changes were proposed in this pull request?
    
    Allow Netty Worker thread pool size to dynamically adapt to the number of 
processor cores.
    We will have a more reasonable default value and better performance in most 
cases.
    
    ### Why are the changes needed?
    
    For https://github.com/apache/incubator-uniffle/issues/1586.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
---
 docs/server_guide.md                                         |  2 +-
 .../java/org/apache/uniffle/server/ShuffleServerConf.java    |  7 +++++--
 .../java/org/apache/uniffle/server/netty/StreamServer.java   | 12 ++++++++++++
 3 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/docs/server_guide.md b/docs/server_guide.md
index 9d6585987..25224d6d6 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -77,7 +77,7 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 | rss.server.netty.port                                   | -1                 
                                                    | Netty port for Shuffle 
server, if set zero, Netty server start on random port.                         
                                                                                
                                                                                
                                                                                
                 [...]
 | rss.server.netty.epoll.enable                           | false              
                                                    | If enable epoll model 
with Netty server.                                                              
                                                                                
                                                                                
                                                                                
                  [...]
 | rss.server.netty.accept.thread                          | 10                 
                                                    | Accept thread count in 
netty.                                                                          
                                                                                
                                                                                
                                                                                
                 [...]
-| rss.server.netty.worker.thread                          | 100                
                                                    | Worker thread count in 
netty.                                                                          
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.netty.worker.thread                          | 0                  
                                                    | Worker thread count in 
netty. When set to 0, the default value is dynamically set to twice the number 
of processor cores, but it will not be less than 100 to ensure the minimum 
throughput of the service.                                                      
                                                                                
                       [...]
 | rss.server.netty.connect.backlog                        | 0                  
                                                    | For Netty server, 
requested maximum length of the queue of incoming connections.                  
                                                                                
                                                                                
                                                                                
                      [...]
 | rss.server.netty.connect.timeout                        | 5000               
                                                    | Timeout for connection in 
netty.                                                                          
                                                                                
                                                                                
                                                                                
              [...]
 | rss.server.netty.receive.buf                            | 0                  
                                                    | Receive buffer size 
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should 
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system 
automatically estimates the receive buffer size based on default settings.      
                               [...]
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index a6cd26547..e72510eae 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -500,8 +500,11 @@ public class ShuffleServerConf extends RssBaseConf {
   public static final ConfigOption<Integer> NETTY_SERVER_WORKER_THREAD =
       ConfigOptions.key("rss.server.netty.worker.thread")
           .intType()
-          .defaultValue(100)
-          .withDescription("Worker thread count in netty");
+          .defaultValue(0)
+          .withDescription(
+              "Worker thread count in netty. When set to 0, "
+                  + "the default value is dynamically set to twice the number 
of processor cores, "
+                  + "but it will not be less than 100 to ensure the minimum 
throughput of the service.");
 
   public static final ConfigOption<Long> SERVER_NETTY_HANDLER_IDLE_TIMEOUT =
       ConfigOptions.key("rss.server.netty.handler.idle.timeout")
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java 
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
index d7990a126..e1eabe060 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
@@ -30,6 +30,8 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.NettyRuntime;
+import io.netty.util.internal.SystemPropertyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +50,9 @@ public class StreamServer implements ServerInterface {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamServer.class);
 
+  /** Use this value to ensure the minimum throughput. */
+  private static final int MIN_NETTY_SERVER_WORKER_THREAD_COUNT_DEFAULT = 100;
+
   private ShuffleServer shuffleServer;
   private EventLoopGroup shuffleBossGroup;
   private EventLoopGroup shuffleWorkerGroup;
@@ -61,6 +66,13 @@ public class StreamServer implements ServerInterface {
         
shuffleServerConf.getBoolean(ShuffleServerConf.NETTY_SERVER_EPOLL_ENABLE);
     int acceptThreads = 
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_ACCEPT_THREAD);
     int workerThreads = 
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_WORKER_THREAD);
+    if (workerThreads == 0) {
+      workerThreads =
+          Math.max(
+              MIN_NETTY_SERVER_WORKER_THREAD_COUNT_DEFAULT,
+              SystemPropertyUtil.getInt(
+                  "io.netty.eventLoopThreads", 
NettyRuntime.availableProcessors() * 2));
+    }
     if (isEpollEnable) {
       shuffleBossGroup = new EpollEventLoopGroup(acceptThreads);
       shuffleWorkerGroup = new EpollEventLoopGroup(workerThreads);

Reply via email to