This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new fd715b41a [CELEBORN-1993] CelebornConf introduces
celeborn.<module>.io.threads to specify number of threads used in the client
thread pool
fd715b41a is described below
commit fd715b41af6eace7d50a5c042c7f1bb0bfac84dd
Author: SteNicholas <[email protected]>
AuthorDate: Tue May 20 17:44:38 2025 +0800
[CELEBORN-1993] CelebornConf introduces celeborn.<module>.io.threads to
specify number of threads used in the client thread pool
### What changes were proposed in this pull request?
`CelebornConf` introduces `celeborn.<module>.io.threads` to specify number
of threads used in the client thread pool.
### Why are the changes needed?
`ShuffleClientImpl` and `FlinkShuffleClientImpl` use fixed configuration
expression as `conf.getInt("celeborn." + module + ".io.threads", 8)`.
Therefore, `CelebornConf` should introduce `celeborn.<module>.io.threads` to
specify number of threads used in the client thread pool.
### Does this PR introduce _any_ user-facing change?
`CelebornConf` adds `celeborn.<module>.io.threads` config option.
### How was this patch tested?
No.
Closes #3245 from SteNicholas/CELEBORN-1993.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../plugin/flink/client/FlinkShuffleClientImpl.java | 2 +-
.../apache/celeborn/client/ShuffleClientImpl.java | 3 +--
.../org/apache/celeborn/common/CelebornConf.scala | 20 +++++++++++++++++---
docs/configuration/network.md | 3 ++-
4 files changed, 21 insertions(+), 7 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
index 109e7c34e..9886620c3 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
@@ -174,7 +174,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
this.bufferSizeBytes = bufferSizeBytes;
String module = TransportModuleConstants.DATA_MODULE;
TransportConf dataTransportConf =
- Utils.fromCelebornConf(conf, module, conf.getInt("celeborn." + module
+ ".io.threads", 8));
+ Utils.fromCelebornConf(conf, module, conf.networkIoThreads(module));
this.context =
new TransportContext(
dataTransportConf, readClientHandler,
conf.clientCloseIdleConnections());
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 9c09e3604..1d540242c 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -222,8 +222,7 @@ public class ShuffleClientImpl extends ShuffleClient {
scala.None$.empty());
String module = TransportModuleConstants.DATA_MODULE;
- dataTransportConf =
- Utils.fromCelebornConf(conf, module, conf.getInt("celeborn." + module
+ ".io.threads", 8));
+ dataTransportConf = Utils.fromCelebornConf(conf, module,
conf.networkIoThreads(module));
initDataClientFactoryIfNeeded();
int pushDataRetryThreads = conf.clientPushRetryThreads();
pushDataRetryPool =
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index d33028a9a..8dbccaa1f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -514,7 +514,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
new RpcTimeout(get(NETWORK_TIMEOUT).milli, NETWORK_TIMEOUT.key)
def networkConnectTimeout: RpcTimeout =
new RpcTimeout(get(NETWORK_CONNECT_TIMEOUT).milli,
NETWORK_CONNECT_TIMEOUT.key)
- def rpcIoThreads: Option[Int] = get(RPC_IO_THREAD)
+ def rpcIoThreads: Option[Int] = get(RPC_IO_THREADS)
def rpcConnectThreads: Int = get(RPC_CONNECT_THREADS)
def rpcLookupTimeout: RpcTimeout =
new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key)
@@ -565,6 +565,10 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
getTransportConfInt(module, NETWORK_IO_BACKLOG)
}
+ def networkIoThreads(module: String): Int = {
+ getTransportConfInt(module, NETWORK_IO_THREADS)
+ }
+
def networkIoServerThreads(module: String): Int = {
getTransportConfInt(module, NETWORK_IO_SERVER_THREADS)
}
@@ -1906,7 +1910,7 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(1)
- val RPC_IO_THREAD: OptionalConfigEntry[Int] =
+ val RPC_IO_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.rpc.io.threads")
.categories("network")
.doc("Netty IO thread number of NettyRpcEnv to handle RPC request. " +
@@ -2090,6 +2094,16 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(0)
+ val NETWORK_IO_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.<module>.io.threads")
+ .categories("network")
+ .doc(s"Default number of threads used in the server and client thread
pool. " +
+ s"This specifies thread configuration based on JVM's allocation of
cores. " +
+ s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
+ s"it works for shuffle client push and fetch data.")
+ .intConf
+ .createWithDefault(8)
+
val NETWORK_IO_SERVER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.<module>.io.serverThreads")
.categories("network")
@@ -2116,7 +2130,7 @@ object CelebornConf extends Logging {
s"If setting <module> to
`${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
- s"it works for shuffle client push and fetch data. " +
+ s"it works for shuffle client push and fetch data, of which default
value is determined by ${NETWORK_IO_THREADS.key} . " +
s"If setting <module> to
`${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate client of worker replicating data to peer
worker.")
.intConf
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index b78c70ad6..721b12461 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -23,7 +23,7 @@ license: |
| celeborn.<module>.fetch.timeoutCheck.threads | 4 | false | Threads num
for checking fetch data timeout. It only support setting <module> to `data`
since it works for shuffle client fetch data. | 0.3.0 | |
| celeborn.<module>.heartbeat.interval | 60s | false | The heartbeat
interval between worker and client. If setting <module> to `push`, it works for
worker receiving push data. If setting <module> to `fetch`, it works for worker
fetch server. If you are using the "celeborn.client.heartbeat.interval", please
use the new configs for each module according to your needs or replace it with
"celeborn.push.heartbeat.interval" and "celeborn.fetch.heartbeat.interval". |
0.3.0 | celeborn.cl [...]
| celeborn.<module>.io.backLog | 0 | false | Requested maximum length of
the queue of incoming connections. Default 0 for no backlog. If setting
<module> to `rpc_app`, works for shuffle client. If setting <module> to
`rpc_service`, works for master or worker. If setting <module> to `push`, it
works for worker receiving push data. If setting <module> to `replicate`, it
works for replicate server of worker replicating data to peer worker. If
setting <module> to `fetch`, it works for [...]
-| celeborn.<module>.io.clientThreads | 0 | false | Number of threads
used in the client thread pool. Default to 0, which is 2x#cores. If setting
<module> to `rpc_app`, works for shuffle client. If setting <module> to
`rpc_service`, works for master or worker. If setting <module> to `data`, it
works for shuffle client push and fetch data. If setting <module> to
`replicate`, it works for replicate client of worker replicating data to peer
worker. | | |
+| celeborn.<module>.io.clientThreads | 0 | false | Number of threads
used in the client thread pool. Default to 0, which is 2x#cores. If setting
<module> to `rpc_app`, works for shuffle client. If setting <module> to
`rpc_service`, works for master or worker. If setting <module> to `data`, it
works for shuffle client push and fetch data, of which default value is
determined by celeborn.<module>.io.threads . If setting <module> to
`replicate`, it works for replicate client of worker [...]
| celeborn.<module>.io.conflictAvoidChooser.enable | false | false |
Whether to use conflict avoid event executor chooser in the client thread pool.
If setting <module> to `rpc_app`, works for shuffle client. If setting <module>
to `rpc_service`, works for master or worker. If setting <module> to `data`, it
works for shuffle client push and fetch data. If setting <module> to
`replicate`, it works for replicate client of worker replicating data to peer
worker. | 0.5.4 | |
| celeborn.<module>.io.connectTimeout | <value of
celeborn.network.connect.timeout> | false | Socket connect timeout. If
setting <module> to `rpc_app`, works for shuffle client. If setting <module> to
`rpc_service`, works for master or worker. If setting <module> to `data`, it
works for shuffle client push and fetch data. If setting <module> to
`replicate`, it works for the replicate client of worker replicating data to
peer worker. | | |
| celeborn.<module>.io.connectionTimeout | <value of
celeborn.network.timeout> | false | Connection active timeout. If setting
<module> to `rpc_app`, works for shuffle client. If setting <module> to
`rpc_service`, works for master or worker. If setting <module> to `data`, it
works for shuffle client push and fetch data. If setting <module> to `push`, it
works for worker receiving push data. If setting <module> to `replicate`, it
works for replicate server or client of worker [...]
@@ -37,6 +37,7 @@ license: |
| celeborn.<module>.io.saslTimeout | 30s | false | Timeout for a single
round trip of auth message exchange, in milliseconds. | 0.5.0 | |
| celeborn.<module>.io.sendBuffer | 0b | false | Send buffer size
(SO_SNDBUF). If setting <module> to `rpc_app`, works for shuffle client. If
setting <module> to `rpc_service`, works for master or worker. If setting
<module> to `data`, it works for shuffle client push and fetch data. If setting
<module> to `push`, it works for worker receiving push data. If setting
<module> to `replicate`, it works for replicate server or client of worker
replicating data to peer worker. If setting [...]
| celeborn.<module>.io.serverThreads | 0 | false | Number of threads
used in the server thread pool. Default to 0, which is 2x#cores. If setting
<module> to `rpc_app`, works for shuffle client. If setting <module> to
`rpc_service`, works for master or worker. If setting <module> to `push`, it
works for worker receiving push data. If setting <module> to `replicate`, it
works for replicate server of worker replicating data to peer worker. If
setting <module> to `fetch`, it works for [...]
+| celeborn.<module>.io.threads | 8 | false | Default number of threads
used in the server and client thread pool. This specifies thread configuration
based on JVM's allocation of cores. If setting <module> to `data`, it works for
shuffle client push and fetch data. | | |
| celeborn.<module>.push.timeoutCheck.interval | 5s | false | Interval
for checking push data timeout. If setting <module> to `data`, it works for
shuffle client push data. If setting <module> to `push`, it works for Flink
shuffle client push data. If setting <module> to `replicate`, it works for
replicate client of worker replicating data to peer worker. | 0.3.0 | |
| celeborn.<module>.push.timeoutCheck.threads | 4 | false | Threads num
for checking push data timeout. If setting <module> to `data`, it works for
shuffle client push data. If setting <module> to `push`, it works for Flink
shuffle client push data. If setting <module> to `replicate`, it works for
replicate client of worker replicating data to peer worker. | 0.3.0 | |
| celeborn.<role>.rpc.dispatcher.threads | <value of
celeborn.rpc.dispatcher.threads> | false | Threads number of message
dispatcher event loop for roles | | |