This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new cc3f52b6f [#1767] feat(netty): The client-side supports choosing
Netty's ByteBufAllocator (#1768)
cc3f52b6f is described below
commit cc3f52b6f4938fce16f77a745dc20b332c7d25f7
Author: RickyMa <[email protected]>
AuthorDate: Tue Jun 11 13:39:09 2024 +0800
[#1767] feat(netty): The client-side supports choosing Netty's
ByteBufAllocator (#1768)
### What changes were proposed in this pull request?
The client-side supports choosing Netty's ByteBufAllocator.
### Why are the changes needed?
For https://github.com/apache/incubator-uniffle/issues/1767.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
---
.../apache/uniffle/common/config/RssClientConf.java | 9 ++++++++-
.../common/netty/client/TransportClientFactory.java | 14 ++++++++------
.../uniffle/common/netty/client/TransportConf.java | 6 +++++-
docs/client_guide/client_guide.md | 21 +++++++++++----------
4 files changed, 32 insertions(+), 18 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index b45abc871..8aeb09afd 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -132,13 +132,20 @@ public class RssClientConf {
.defaultValue(0)
.withDescription("Number of threads used in the client thread
pool.");
- public static final ConfigOption<Boolean> NETWORK_CLIENT_PREFER_DIRECT_BUFS =
+ public static final ConfigOption<Boolean> NETTY_CLIENT_PREFER_DIRECT_BUFS =
ConfigOptions.key("rss.client.netty.client.prefer.direct.bufs")
.booleanType()
.defaultValue(true)
.withDescription(
"If true, we will prefer allocating off-heap byte buffers within
Netty.");
+ public static final ConfigOption<Boolean>
NETTY_CLIENT_POOLED_ALLOCATOR_ENABLED =
+ ConfigOptions.key("rss.client.netty.client.pooled.allocator.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "If true, we will use PooledByteBufAllocator to allocate byte
buffers within Netty, otherwise we'll use UnpooledByteBufAllocator.");
+
public static final ConfigOption<Integer>
NETTY_CLIENT_NUM_CONNECTIONS_PER_PEER =
ConfigOptions.key("rss.client.netty.client.connections.per.peer")
.intType()
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
index 2ede795f4..f775ef452 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -73,7 +73,7 @@ public class TransportClientFactory implements Closeable {
private final Class<? extends Channel> socketChannelClass;
private EventLoopGroup workerGroup;
- private PooledByteBufAllocator pooledAllocator;
+ private AbstractByteBufAllocator byteBufAllocator;
public TransportClientFactory(TransportContext context) {
this.context = Objects.requireNonNull(context);
@@ -85,9 +85,11 @@ public class TransportClientFactory implements Closeable {
IOMode ioMode = conf.ioMode();
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
this.workerGroup = NettyUtils.createEventLoop(ioMode,
conf.clientThreads(), "netty-rpc-client");
- this.pooledAllocator =
- NettyUtils.createPooledByteBufAllocator(
- conf.preferDirectBufs(), false, conf.clientThreads());
+ this.byteBufAllocator =
+ conf.isPooledAllocatorEnabled()
+ ? NettyUtils.createPooledByteBufAllocator(
+ conf.preferDirectBufs(), false, conf.clientThreads())
+ :
NettyUtils.createUnpooledByteBufAllocator(conf.preferDirectBufs());
}
public TransportClient createClient(String remoteHost, int remotePort, int
partitionId)
@@ -179,7 +181,7 @@ public class TransportClientFactory implements Closeable {
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectTimeoutMs())
- .option(ChannelOption.ALLOCATOR, pooledAllocator);
+ .option(ChannelOption.ALLOCATOR, byteBufAllocator);
if (conf.receiveBuf() > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
index b71034508..684645080 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
@@ -50,7 +50,11 @@ public class TransportConf {
}
public boolean preferDirectBufs() {
- return rssConf.get(RssClientConf.NETWORK_CLIENT_PREFER_DIRECT_BUFS);
+ return rssConf.get(RssClientConf.NETTY_CLIENT_PREFER_DIRECT_BUFS);
+ }
+
+ public boolean isPooledAllocatorEnabled() {
+ return rssConf.get(RssClientConf.NETTY_CLIENT_POOLED_ALLOCATOR_ENABLED);
}
public int receiveBuf() {
diff --git a/docs/client_guide/client_guide.md
b/docs/client_guide/client_guide.md
index b2c98fcb5..28e6ca88c 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -96,13 +96,14 @@ spark.rss.data.replica.read 2
```
### Netty Setting
-| Property Name | Default |
Description
|
-|-------------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| <client_type>.rss.client.type | GRPC |
Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using
GRPC_NETTY to enable Netty on the client side for better stability and
performance.
|
-| <client_type>.rss.client.netty.io.mode | NIO |
Netty EventLoopGroup backend, available options: NIO, EPOLL.
|
-| <client_type>.rss.client.netty.client.connection.timeout.ms | 600000 |
Connection active timeout.
|
-| <client_type>.rss.client.netty.client.threads | 0 |
Number of threads used in the client thread pool. Default is 0, Netty will use
the number of (available logical cores * 2) as the number of threads.
|
-| <client_type>.rss.client.netty.client.prefer.direct.bufs | true | If
true, we will prefer allocating off-heap byte buffers within Netty.
|
-| <client_type>.rss.client.netty.client.connections.per.peer | 2 |
Suppose there are 100 executors,
spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer
will establish a total of (100 * 2) connections with multiple clients.
|
-| <client_type>.rss.client.netty.client.receive.buffer | 0 |
Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and
send buffer should be latency * network_bandwidth. Assuming latency = 1ms,
network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the
operating system automatically estimates the receive buffer size based on
default settings. |
-| <client_type>.rss.client.netty.client.send.buffer | 0 | Send
buffer size (SO_SNDBUF).
|
+| Property Name | Default |
Description
|
+|----------------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| <client_type>.rss.client.type | GRPC |
Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using
GRPC_NETTY to enable Netty on the client side for better stability and
performance.
|
+| <client_type>.rss.client.netty.io.mode | NIO |
Netty EventLoopGroup backend, available options: NIO, EPOLL.
|
+| <client_type>.rss.client.netty.client.connection.timeout.ms | 600000 |
Connection active timeout.
|
+| <client_type>.rss.client.netty.client.threads | 0 |
Number of threads used in the client thread pool. Default is 0, Netty will use
the number of (available logical cores * 2) as the number of threads.
|
+| <client_type>.rss.client.netty.client.prefer.direct.bufs | true |
If true, we will prefer allocating off-heap byte buffers within Netty.
|
+| <client_type>.rss.client.netty.client.pooled.allocator.enabled | true |
If true, we will use PooledByteBufAllocator to allocate byte buffers within
Netty, otherwise we'll use UnpooledByteBufAllocator.
|
+| <client_type>.rss.client.netty.client.connections.per.peer | 2 |
Suppose there are 100 executors,
spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer
will establish a total of (100 * 2) connections with multiple clients.
|
+| <client_type>.rss.client.netty.client.receive.buffer | 0 |
Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and
send buffer should be latency * network_bandwidth. Assuming latency = 1ms,
network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the
operating system automatically estimates the receive buffer size based on
default settings. |
+| <client_type>.rss.client.netty.client.send.buffer | 0 |
Send buffer size (SO_SNDBUF).
|