This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new d5c2d2edb [#2107] improvement(client): Optimize the conf of the number 
of threads of Netty client (#2108)
d5c2d2edb is described below

commit d5c2d2edbbd8b5c31fa5ec73c6fee25491601631
Author: xianjingfeng <[email protected]>
AuthorDate: Wed Sep 18 17:38:37 2024 +0800

    [#2107] improvement(client): Optimize the conf of the number of threads of 
Netty client (#2108)
    
    ### What changes were proposed in this pull request?
    
    Make `rss.client.netty.client.connections.per.peer` * 
`rss.client.netty.client.threads.ratio` as the thread number of workerGroup if 
`rss.client.netty.client.threads` is not set.
    
    ### Why are the changes needed?
    
    Fix: #2107
    
    ### Does this PR introduce _any_ user-facing change?
    
    New param: `rss.client.netty.client.threads.ratio`
    
    ### How was this patch tested?
    
    CI
    
    ---------
    
    Co-authored-by: zhengchenyu001 <[email protected]>
---
 .../java/org/apache/uniffle/common/config/RssClientConf.java   |  9 +++++++++
 .../uniffle/common/netty/client/TransportClientFactory.java    | 10 +++++++---
 .../org/apache/uniffle/common/netty/client/TransportConf.java  |  4 ++++
 docs/client_guide/client_guide.md                              |  1 +
 4 files changed, 21 insertions(+), 3 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index f3e16de32..c0ade1073 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -157,6 +157,15 @@ public class RssClientConf {
           .defaultValue(0)
           .withDescription("Number of threads used in the client thread 
pool.");
 
+  public static final ConfigOption<Double> NETTY_CLIENT_THREADS_RATIO =
+      ConfigOptions.key("rss.client.netty.client.threads.ratio")
+          .doubleType()
+          .defaultValue(2.0)
+          .withDescription(
+              "The number of threads used in the client thread pool will be "
+                  + "(`rss.client.netty.client.connections.per.peer` * 
`rss.client.netty.client.threads.ratio`). "
+                  + "This is only effective when 
`rss.client.netty.client.threads` is not explicitly set");
+
   public static final ConfigOption<Boolean> NETTY_CLIENT_PREFER_DIRECT_BUFS =
       ConfigOptions.key("rss.client.netty.client.prefer.direct.bufs")
           .booleanType()
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
index 55ec5c245..89529c81f 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
@@ -85,18 +85,22 @@ public class TransportClientFactory implements Closeable {
 
     IOMode ioMode = conf.ioMode();
     this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
-    this.workerGroup = NettyUtils.createEventLoop(ioMode, 
conf.clientThreads(), "netty-rpc-client");
+    int clientThreads =
+        conf.clientThreads() > 0
+            ? conf.clientThreads()
+            : (int) (numConnectionsPerPeer * conf.clientThreadsRatio());
+    this.workerGroup = NettyUtils.createEventLoop(ioMode, clientThreads, 
"netty-rpc-client");
     if (conf.isSharedAllocatorEnabled()) {
       this.byteBufAllocator =
           conf.isPooledAllocatorEnabled()
               ? NettyUtils.getSharedPooledByteBufAllocator(
-                  conf.preferDirectBufs(), false, conf.clientThreads())
+                  conf.preferDirectBufs(), false, clientThreads)
               : 
NettyUtils.getSharedUnpooledByteBufAllocator(conf.preferDirectBufs());
     } else {
       this.byteBufAllocator =
           conf.isPooledAllocatorEnabled()
               ? NettyUtils.createPooledByteBufAllocator(
-                  conf.preferDirectBufs(), false, conf.clientThreads())
+                  conf.preferDirectBufs(), false, clientThreads)
               : 
NettyUtils.createUnpooledByteBufAllocator(conf.preferDirectBufs());
     }
     if (logger.isDebugEnabled()) {
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
index 64a6e912d..8ba2999c1 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
@@ -45,6 +45,10 @@ public class TransportConf {
     return rssConf.get(RssClientConf.NETTY_CLIENT_THREADS);
   }
 
+  public double clientThreadsRatio() {
+    return rssConf.get(RssClientConf.NETTY_CLIENT_THREADS_RATIO);
+  }
+
   public int numConnectionsPerPeer() {
     return rssConf.get(RssClientConf.NETTY_CLIENT_NUM_CONNECTIONS_PER_PEER);
   }
diff --git a/docs/client_guide/client_guide.md 
b/docs/client_guide/client_guide.md
index ccf4e437e..bf7bb5d16 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -106,6 +106,7 @@ spark.rss.data.replica.read 2
 | <client_type>.rss.client.netty.io.mode                         | NIO     | 
Netty EventLoopGroup backend, available options: NIO, EPOLL.                    
                                                                                
                                                                                
                                                                                
                                                |
 | <client_type>.rss.client.netty.client.connection.timeout.ms    | 600000  | 
Connection active timeout.                                                      
                                                                                
                                                                                
                                                                                
                                                |
 | <client_type>.rss.client.netty.client.threads                  | 0       | 
Number of threads used in the client thread pool. Default is 0, Netty will use 
the number of (available logical cores * 2) as the number of threads.           
                                                                                
                                                                                
                                                 |
+| <client_type>.rss.client.netty.client.threads.ratio            | 2.0     | 
The number of threads used in the client thread pool will be 
(`<client_type>.rss.client.netty.client.connections.per.peer` * 
`<client_type>.rss.client.netty.client.threads.ratio`). This is only effective 
when `<client_type>.rss.client.netty.client.threads` is not explicitly set.     
                                                                                
    |                                               [...]
 | <client_type>.rss.client.netty.client.prefer.direct.bufs       | true    | 
If true, we will prefer allocating off-heap byte buffers within Netty.          
                                                                                
                                                                                
                                                                                
                                                |
 | <client_type>.rss.client.netty.client.pooled.allocator.enabled | true    | 
If true, we will use PooledByteBufAllocator to allocate byte buffers within 
Netty, otherwise we'll use UnpooledByteBufAllocator.                            
                                                                                
                                                                                
                                                    |
 | <client_type>.rss.client.netty.client.shared.allocator.enabled | true    | A 
flag indicating whether to share the ByteBuf allocators between the different 
Netty channels when enabling Netty. If enabled then only three ByteBuf 
allocators are created: one PooledByteBufAllocator where caching is allowed, 
one PooledByteBufAllocator where not and one UnpooledByteBufAllocator. When 
disabled, a new allocator is created for each transport client. |

Reply via email to