This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new fd715b41a [CELEBORN-1993] CelebornConf introduces 
celeborn.<module>.io.threads to specify number of threads used in the client 
thread pool
fd715b41a is described below

commit fd715b41af6eace7d50a5c042c7f1bb0bfac84dd
Author: SteNicholas <[email protected]>
AuthorDate: Tue May 20 17:44:38 2025 +0800

    [CELEBORN-1993] CelebornConf introduces celeborn.<module>.io.threads to 
specify number of threads used in the client thread pool
    
    ### What changes were proposed in this pull request?
    
    `CelebornConf` introduces `celeborn.<module>.io.threads` to specify number 
of threads used in the client thread pool.
    
    ### Why are the changes needed?
    
    `ShuffleClientImpl` and `FlinkShuffleClientImpl` use fixed configuration 
expression as `conf.getInt("celeborn." + module + ".io.threads", 8)`. 
Therefore, `CelebornConf` should introduce `celeborn.<module>.io.threads` to 
specify number of threads used in the client thread pool.
    
    ### Does this PR introduce _any_ user-facing change?
    
    `CelebornConf` adds `celeborn.<module>.io.threads` config option.
    
    ### How was this patch tested?
    
    No.
    
    Closes #3245 from SteNicholas/CELEBORN-1993.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../plugin/flink/client/FlinkShuffleClientImpl.java  |  2 +-
 .../apache/celeborn/client/ShuffleClientImpl.java    |  3 +--
 .../org/apache/celeborn/common/CelebornConf.scala    | 20 +++++++++++++++++---
 docs/configuration/network.md                        |  3 ++-
 4 files changed, 21 insertions(+), 7 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
index 109e7c34e..9886620c3 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
@@ -174,7 +174,7 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
     this.bufferSizeBytes = bufferSizeBytes;
     String module = TransportModuleConstants.DATA_MODULE;
     TransportConf dataTransportConf =
-        Utils.fromCelebornConf(conf, module, conf.getInt("celeborn." + module 
+ ".io.threads", 8));
+        Utils.fromCelebornConf(conf, module, conf.networkIoThreads(module));
     this.context =
         new TransportContext(
             dataTransportConf, readClientHandler, 
conf.clientCloseIdleConnections());
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 9c09e3604..1d540242c 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -222,8 +222,7 @@ public class ShuffleClientImpl extends ShuffleClient {
             scala.None$.empty());
 
     String module = TransportModuleConstants.DATA_MODULE;
-    dataTransportConf =
-        Utils.fromCelebornConf(conf, module, conf.getInt("celeborn." + module 
+ ".io.threads", 8));
+    dataTransportConf = Utils.fromCelebornConf(conf, module, 
conf.networkIoThreads(module));
     initDataClientFactoryIfNeeded();
     int pushDataRetryThreads = conf.clientPushRetryThreads();
     pushDataRetryPool =
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index d33028a9a..8dbccaa1f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -514,7 +514,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     new RpcTimeout(get(NETWORK_TIMEOUT).milli, NETWORK_TIMEOUT.key)
   def networkConnectTimeout: RpcTimeout =
     new RpcTimeout(get(NETWORK_CONNECT_TIMEOUT).milli, 
NETWORK_CONNECT_TIMEOUT.key)
-  def rpcIoThreads: Option[Int] = get(RPC_IO_THREAD)
+  def rpcIoThreads: Option[Int] = get(RPC_IO_THREADS)
   def rpcConnectThreads: Int = get(RPC_CONNECT_THREADS)
   def rpcLookupTimeout: RpcTimeout =
     new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key)
@@ -565,6 +565,10 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
     getTransportConfInt(module, NETWORK_IO_BACKLOG)
   }
 
+  def networkIoThreads(module: String): Int = {
+    getTransportConfInt(module, NETWORK_IO_THREADS)
+  }
+
   def networkIoServerThreads(module: String): Int = {
     getTransportConfInt(module, NETWORK_IO_SERVER_THREADS)
   }
@@ -1906,7 +1910,7 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(1)
 
-  val RPC_IO_THREAD: OptionalConfigEntry[Int] =
+  val RPC_IO_THREADS: OptionalConfigEntry[Int] =
     buildConf("celeborn.rpc.io.threads")
       .categories("network")
       .doc("Netty IO thread number of NettyRpcEnv to handle RPC request. " +
@@ -2090,6 +2094,16 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(0)
 
+  val NETWORK_IO_THREADS: ConfigEntry[Int] =
+    buildConf("celeborn.<module>.io.threads")
+      .categories("network")
+      .doc(s"Default number of threads used in the server and client thread 
pool. " +
+        s"This specifies thread configuration based on JVM's allocation of 
cores. " +
+        s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
+        s"it works for shuffle client push and fetch data.")
+      .intConf
+      .createWithDefault(8)
+
   val NETWORK_IO_SERVER_THREADS: ConfigEntry[Int] =
     buildConf("celeborn.<module>.io.serverThreads")
       .categories("network")
@@ -2116,7 +2130,7 @@ object CelebornConf extends Logging {
         s"If setting <module> to 
`${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
         s"works for master or worker. " +
         s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
-        s"it works for shuffle client push and fetch data. " +
+        s"it works for shuffle client push and fetch data, of which default 
value is determined by ${NETWORK_IO_THREADS.key} . " +
         s"If setting <module> to 
`${TransportModuleConstants.REPLICATE_MODULE}`, " +
         s"it works for replicate client of worker replicating data to peer 
worker.")
       .intConf
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index b78c70ad6..721b12461 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -23,7 +23,7 @@ license: |
 | celeborn.&lt;module&gt;.fetch.timeoutCheck.threads | 4 | false | Threads num 
for checking fetch data timeout. It only support setting <module> to `data` 
since it works for shuffle client fetch data. | 0.3.0 |  | 
 | celeborn.&lt;module&gt;.heartbeat.interval | 60s | false | The heartbeat 
interval between worker and client. If setting <module> to `push`, it works for 
worker receiving push data. If setting <module> to `fetch`, it works for worker 
fetch server. If you are using the "celeborn.client.heartbeat.interval", please 
use the new configs for each module according to your needs or replace it with 
"celeborn.push.heartbeat.interval" and "celeborn.fetch.heartbeat.interval".  | 
0.3.0 | celeborn.cl [...]
 | celeborn.&lt;module&gt;.io.backLog | 0 | false | Requested maximum length of 
the queue of incoming connections. Default 0 for no backlog. If setting 
<module> to `rpc_app`, works for shuffle client. If setting <module> to 
`rpc_service`, works for master or worker. If setting <module> to `push`, it 
works for worker receiving push data. If setting <module> to `replicate`, it 
works for replicate server of worker replicating data to peer worker. If 
setting <module> to `fetch`, it works for  [...]
-| celeborn.&lt;module&gt;.io.clientThreads | 0 | false | Number of threads 
used in the client thread pool. Default to 0, which is 2x#cores. If setting 
<module> to `rpc_app`, works for shuffle client. If setting <module> to 
`rpc_service`, works for master or worker. If setting <module> to `data`, it 
works for shuffle client push and fetch data. If setting <module> to 
`replicate`, it works for replicate client of worker replicating data to peer 
worker. |  |  | 
+| celeborn.&lt;module&gt;.io.clientThreads | 0 | false | Number of threads 
used in the client thread pool. Default to 0, which is 2x#cores. If setting 
<module> to `rpc_app`, works for shuffle client. If setting <module> to 
`rpc_service`, works for master or worker. If setting <module> to `data`, it 
works for shuffle client push and fetch data, of which default value is 
determined by celeborn.<module>.io.threads . If setting <module> to 
`replicate`, it works for replicate client of worker [...]
 | celeborn.&lt;module&gt;.io.conflictAvoidChooser.enable | false | false | 
Whether to use conflict avoid event executor chooser in the client thread pool. 
If setting <module> to `rpc_app`, works for shuffle client. If setting <module> 
to `rpc_service`, works for master or worker. If setting <module> to `data`, it 
works for shuffle client push and fetch data. If setting <module> to 
`replicate`, it works for replicate client of worker replicating data to peer 
worker. | 0.5.4 |  | 
 | celeborn.&lt;module&gt;.io.connectTimeout | &lt;value of 
celeborn.network.connect.timeout&gt; | false | Socket connect timeout. If 
setting <module> to `rpc_app`, works for shuffle client. If setting <module> to 
`rpc_service`, works for master or worker. If setting <module> to `data`, it 
works for shuffle client push and fetch data. If setting <module> to 
`replicate`, it works for the replicate client of worker replicating data to 
peer worker. |  |  | 
 | celeborn.&lt;module&gt;.io.connectionTimeout | &lt;value of 
celeborn.network.timeout&gt; | false | Connection active timeout. If setting 
<module> to `rpc_app`, works for shuffle client. If setting <module> to 
`rpc_service`, works for master or worker. If setting <module> to `data`, it 
works for shuffle client push and fetch data. If setting <module> to `push`, it 
works for worker receiving push data. If setting <module> to `replicate`, it 
works for replicate server or client of worker  [...]
@@ -37,6 +37,7 @@ license: |
 | celeborn.&lt;module&gt;.io.saslTimeout | 30s | false | Timeout for a single 
round trip of auth message exchange, in milliseconds. | 0.5.0 |  | 
 | celeborn.&lt;module&gt;.io.sendBuffer | 0b | false | Send buffer size 
(SO_SNDBUF). If setting <module> to `rpc_app`, works for shuffle client. If 
setting <module> to `rpc_service`, works for master or worker. If setting 
<module> to `data`, it works for shuffle client push and fetch data. If setting 
<module> to `push`, it works for worker receiving push data. If setting 
<module> to `replicate`, it works for replicate server or client of worker 
replicating data to peer worker. If setting [...]
 | celeborn.&lt;module&gt;.io.serverThreads | 0 | false | Number of threads 
used in the server thread pool. Default to 0, which is 2x#cores. If setting 
<module> to `rpc_app`, works for shuffle client. If setting <module> to 
`rpc_service`, works for master or worker. If setting <module> to `push`, it 
works for worker receiving push data. If setting <module> to `replicate`, it 
works for replicate server of worker replicating data to peer worker. If 
setting <module> to `fetch`, it works for  [...]
+| celeborn.&lt;module&gt;.io.threads | 8 | false | Default number of threads 
used in the server and client thread pool. This specifies thread configuration 
based on JVM's allocation of cores. If setting <module> to `data`, it works for 
shuffle client push and fetch data. |  |  | 
 | celeborn.&lt;module&gt;.push.timeoutCheck.interval | 5s | false | Interval 
for checking push data timeout. If setting <module> to `data`, it works for 
shuffle client push data. If setting <module> to `push`, it works for Flink 
shuffle client push data. If setting <module> to `replicate`, it works for 
replicate client of worker replicating data to peer worker. | 0.3.0 |  | 
 | celeborn.&lt;module&gt;.push.timeoutCheck.threads | 4 | false | Threads num 
for checking push data timeout. If setting <module> to `data`, it works for 
shuffle client push data. If setting <module> to `push`, it works for Flink 
shuffle client push data. If setting <module> to `replicate`, it works for 
replicate client of worker replicating data to peer worker. | 0.3.0 |  | 
 | celeborn.&lt;role&gt;.rpc.dispatcher.threads | &lt;value of 
celeborn.rpc.dispatcher.threads&gt; | false | Threads number of message 
dispatcher event loop for roles |  |  | 

Reply via email to