This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new d5c2d2edb [#2107] improvement(client): Optimize the conf of the number
of threads of Netty client (#2108)
d5c2d2edb is described below
commit d5c2d2edbbd8b5c31fa5ec73c6fee25491601631
Author: xianjingfeng <[email protected]>
AuthorDate: Wed Sep 18 17:38:37 2024 +0800
[#2107] improvement(client): Optimize the conf of the number of threads of
Netty client (#2108)
### What changes were proposed in this pull request?
Make `rss.client.netty.client.connections.per.peer` *
`rss.client.netty.client.threads.ratio` as the thread number of workerGroup if
`rss.client.netty.client.threads` is not set.
### Why are the changes needed?
Fix: #2107
### Does this PR introduce _any_ user-facing change?
New param: `rss.client.netty.client.threads.ratio`
### How was this patch tested?
CI
---------
Co-authored-by: zhengchenyu001 <[email protected]>
---
.../java/org/apache/uniffle/common/config/RssClientConf.java | 9 +++++++++
.../uniffle/common/netty/client/TransportClientFactory.java | 10 +++++++---
.../org/apache/uniffle/common/netty/client/TransportConf.java | 4 ++++
docs/client_guide/client_guide.md | 1 +
4 files changed, 21 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index f3e16de32..c0ade1073 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -157,6 +157,15 @@ public class RssClientConf {
.defaultValue(0)
.withDescription("Number of threads used in the client thread
pool.");
+ public static final ConfigOption<Double> NETTY_CLIENT_THREADS_RATIO =
+ ConfigOptions.key("rss.client.netty.client.threads.ratio")
+ .doubleType()
+ .defaultValue(2.0)
+ .withDescription(
+ "The number of threads used in the client thread pool will be "
+ + "(`rss.client.netty.client.connections.per.peer` *
`rss.client.netty.client.threads.ratio`). "
+ + "This is only effective when
`rss.client.netty.client.threads` is not explicitly set");
+
public static final ConfigOption<Boolean> NETTY_CLIENT_PREFER_DIRECT_BUFS =
ConfigOptions.key("rss.client.netty.client.prefer.direct.bufs")
.booleanType()
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
index 55ec5c245..89529c81f 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
@@ -85,18 +85,22 @@ public class TransportClientFactory implements Closeable {
IOMode ioMode = conf.ioMode();
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
- this.workerGroup = NettyUtils.createEventLoop(ioMode,
conf.clientThreads(), "netty-rpc-client");
+ int clientThreads =
+ conf.clientThreads() > 0
+ ? conf.clientThreads()
+ : (int) (numConnectionsPerPeer * conf.clientThreadsRatio());
+ this.workerGroup = NettyUtils.createEventLoop(ioMode, clientThreads,
"netty-rpc-client");
if (conf.isSharedAllocatorEnabled()) {
this.byteBufAllocator =
conf.isPooledAllocatorEnabled()
? NettyUtils.getSharedPooledByteBufAllocator(
- conf.preferDirectBufs(), false, conf.clientThreads())
+ conf.preferDirectBufs(), false, clientThreads)
:
NettyUtils.getSharedUnpooledByteBufAllocator(conf.preferDirectBufs());
} else {
this.byteBufAllocator =
conf.isPooledAllocatorEnabled()
? NettyUtils.createPooledByteBufAllocator(
- conf.preferDirectBufs(), false, conf.clientThreads())
+ conf.preferDirectBufs(), false, clientThreads)
:
NettyUtils.createUnpooledByteBufAllocator(conf.preferDirectBufs());
}
if (logger.isDebugEnabled()) {
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
index 64a6e912d..8ba2999c1 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
@@ -45,6 +45,10 @@ public class TransportConf {
return rssConf.get(RssClientConf.NETTY_CLIENT_THREADS);
}
+ public double clientThreadsRatio() {
+ return rssConf.get(RssClientConf.NETTY_CLIENT_THREADS_RATIO);
+ }
+
public int numConnectionsPerPeer() {
return rssConf.get(RssClientConf.NETTY_CLIENT_NUM_CONNECTIONS_PER_PEER);
}
diff --git a/docs/client_guide/client_guide.md
b/docs/client_guide/client_guide.md
index ccf4e437e..bf7bb5d16 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -106,6 +106,7 @@ spark.rss.data.replica.read 2
| <client_type>.rss.client.netty.io.mode | NIO |
Netty EventLoopGroup backend, available options: NIO, EPOLL.
|
| <client_type>.rss.client.netty.client.connection.timeout.ms | 600000 |
Connection active timeout.
|
| <client_type>.rss.client.netty.client.threads | 0 |
Number of threads used in the client thread pool. Default is 0, Netty will use
the number of (available logical cores * 2) as the number of threads.
|
+| <client_type>.rss.client.netty.client.threads.ratio | 2.0 |
The number of threads used in the client thread pool will be
(`<client_type>.rss.client.netty.client.connections.per.peer` *
`<client_type>.rss.client.netty.client.threads.ratio`). This is only effective
when `<client_type>.rss.client.netty.client.threads` is not explicitly set.
| [...]
| <client_type>.rss.client.netty.client.prefer.direct.bufs | true |
If true, we will prefer allocating off-heap byte buffers within Netty.
|
| <client_type>.rss.client.netty.client.pooled.allocator.enabled | true |
If true, we will use PooledByteBufAllocator to allocate byte buffers within
Netty, otherwise we'll use UnpooledByteBufAllocator.
|
| <client_type>.rss.client.netty.client.shared.allocator.enabled | true | A
flag indicating whether to share the ByteBuf allocators between the different
Netty channels when enabling Netty. If enabled then only three ByteBuf
allocators are created: one PooledByteBufAllocator where caching is allowed,
one PooledByteBufAllocator where not and one UnpooledByteBufAllocator. When
disabled, a new allocator is created for each transport client. |