This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new cc3f52b6f [#1767] feat(netty): The client-side supports choosing 
Netty's ByteBufAllocator (#1768)
cc3f52b6f is described below

commit cc3f52b6f4938fce16f77a745dc20b332c7d25f7
Author: RickyMa <[email protected]>
AuthorDate: Tue Jun 11 13:39:09 2024 +0800

    [#1767] feat(netty): The client-side supports choosing Netty's 
ByteBufAllocator (#1768)
    
    ### What changes were proposed in this pull request?
    
    The client-side supports choosing Netty's ByteBufAllocator.
    
    ### Why are the changes needed?
    
    For https://github.com/apache/incubator-uniffle/issues/1767.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
---
 .../apache/uniffle/common/config/RssClientConf.java |  9 ++++++++-
 .../common/netty/client/TransportClientFactory.java | 14 ++++++++------
 .../uniffle/common/netty/client/TransportConf.java  |  6 +++++-
 docs/client_guide/client_guide.md                   | 21 +++++++++++----------
 4 files changed, 32 insertions(+), 18 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index b45abc871..8aeb09afd 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -132,13 +132,20 @@ public class RssClientConf {
           .defaultValue(0)
           .withDescription("Number of threads used in the client thread 
pool.");
 
-  public static final ConfigOption<Boolean> NETWORK_CLIENT_PREFER_DIRECT_BUFS =
+  public static final ConfigOption<Boolean> NETTY_CLIENT_PREFER_DIRECT_BUFS =
       ConfigOptions.key("rss.client.netty.client.prefer.direct.bufs")
           .booleanType()
           .defaultValue(true)
           .withDescription(
               "If true, we will prefer allocating off-heap byte buffers within 
Netty.");
 
+  public static final ConfigOption<Boolean> 
NETTY_CLIENT_POOLED_ALLOCATOR_ENABLED =
+      ConfigOptions.key("rss.client.netty.client.pooled.allocator.enabled")
+          .booleanType()
+          .defaultValue(true)
+          .withDescription(
+              "If true, we will use PooledByteBufAllocator to allocate byte 
buffers within Netty, otherwise we'll use UnpooledByteBufAllocator.");
+
   public static final ConfigOption<Integer> 
NETTY_CLIENT_NUM_CONNECTIONS_PER_PEER =
       ConfigOptions.key("rss.client.netty.client.connections.per.peer")
           .intType()
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
index 2ede795f4..f775ef452 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.AbstractByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -73,7 +73,7 @@ public class TransportClientFactory implements Closeable {
 
   private final Class<? extends Channel> socketChannelClass;
   private EventLoopGroup workerGroup;
-  private PooledByteBufAllocator pooledAllocator;
+  private AbstractByteBufAllocator byteBufAllocator;
 
   public TransportClientFactory(TransportContext context) {
     this.context = Objects.requireNonNull(context);
@@ -85,9 +85,11 @@ public class TransportClientFactory implements Closeable {
     IOMode ioMode = conf.ioMode();
     this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
     this.workerGroup = NettyUtils.createEventLoop(ioMode, 
conf.clientThreads(), "netty-rpc-client");
-    this.pooledAllocator =
-        NettyUtils.createPooledByteBufAllocator(
-            conf.preferDirectBufs(), false, conf.clientThreads());
+    this.byteBufAllocator =
+        conf.isPooledAllocatorEnabled()
+            ? NettyUtils.createPooledByteBufAllocator(
+                conf.preferDirectBufs(), false, conf.clientThreads())
+            : 
NettyUtils.createUnpooledByteBufAllocator(conf.preferDirectBufs());
   }
 
   public TransportClient createClient(String remoteHost, int remotePort, int 
partitionId)
@@ -179,7 +181,7 @@ public class TransportClientFactory implements Closeable {
         .option(ChannelOption.TCP_NODELAY, true)
         .option(ChannelOption.SO_KEEPALIVE, true)
         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectTimeoutMs())
-        .option(ChannelOption.ALLOCATOR, pooledAllocator);
+        .option(ChannelOption.ALLOCATOR, byteBufAllocator);
 
     if (conf.receiveBuf() > 0) {
       bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
index b71034508..684645080 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
@@ -50,7 +50,11 @@ public class TransportConf {
   }
 
   public boolean preferDirectBufs() {
-    return rssConf.get(RssClientConf.NETWORK_CLIENT_PREFER_DIRECT_BUFS);
+    return rssConf.get(RssClientConf.NETTY_CLIENT_PREFER_DIRECT_BUFS);
+  }
+
+  public boolean isPooledAllocatorEnabled() {
+    return rssConf.get(RssClientConf.NETTY_CLIENT_POOLED_ALLOCATOR_ENABLED);
   }
 
   public int receiveBuf() {
diff --git a/docs/client_guide/client_guide.md 
b/docs/client_guide/client_guide.md
index b2c98fcb5..28e6ca88c 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -96,13 +96,14 @@ spark.rss.data.replica.read 2
 ```
 
 ### Netty Setting
-| Property Name                                               | Default | 
Description                                                                     
                                                                                
                                                                                
                                                                                
    |
-|-------------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| <client_type>.rss.client.type                               | GRPC    | 
Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using 
GRPC_NETTY to enable Netty on the client side for better stability and 
performance.                                                                    
                                                                                
                |
-| <client_type>.rss.client.netty.io.mode                      | NIO     | 
Netty EventLoopGroup backend, available options: NIO, EPOLL.                    
                                                                                
                                                                                
                                                                                
    |
-| <client_type>.rss.client.netty.client.connection.timeout.ms | 600000  | 
Connection active timeout.                                                      
                                                                                
                                                                                
                                                                                
    |
-| <client_type>.rss.client.netty.client.threads               | 0       | 
Number of threads used in the client thread pool. Default is 0, Netty will use 
the number of (available logical cores * 2) as the number of threads.           
                                                                                
                                                                                
     |
-| <client_type>.rss.client.netty.client.prefer.direct.bufs    | true    | If 
true, we will prefer allocating off-heap byte buffers within Netty.             
                                                                                
                                                                                
                                                                                
 |
-| <client_type>.rss.client.netty.client.connections.per.peer  | 2       | 
Suppose there are 100 executors, 
spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer 
will establish a total of (100 * 2) connections with multiple clients.          
                                                                                
                                                   |
-| <client_type>.rss.client.netty.client.receive.buffer        | 0       | 
Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and 
send buffer should be latency * network_bandwidth. Assuming latency = 1ms, 
network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the 
operating system automatically estimates the receive buffer size based on 
default settings. |
-| <client_type>.rss.client.netty.client.send.buffer           | 0       | Send 
buffer size (SO_SNDBUF).                                                        
                                                                                
                                                                                
                                                                               |
+| Property Name                                                  | Default | 
Description                                                                     
                                                                                
                                                                                
                                                                                
    |
+|----------------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| <client_type>.rss.client.type                                  | GRPC    | 
Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using 
GRPC_NETTY to enable Netty on the client side for better stability and 
performance.                                                                    
                                                                                
                |
+| <client_type>.rss.client.netty.io.mode                         | NIO     | 
Netty EventLoopGroup backend, available options: NIO, EPOLL.                    
                                                                                
                                                                                
                                                                                
    |
+| <client_type>.rss.client.netty.client.connection.timeout.ms    | 600000  | 
Connection active timeout.                                                      
                                                                                
                                                                                
                                                                                
    |
+| <client_type>.rss.client.netty.client.threads                  | 0       | 
Number of threads used in the client thread pool. Default is 0, Netty will use 
the number of (available logical cores * 2) as the number of threads.           
                                                                                
                                                                                
     |
+| <client_type>.rss.client.netty.client.prefer.direct.bufs       | true    | 
If true, we will prefer allocating off-heap byte buffers within Netty.          
                                                                                
                                                                                
                                                                                
    |
+| <client_type>.rss.client.netty.client.pooled.allocator.enabled | true    | 
If true, we will use PooledByteBufAllocator to allocate byte buffers within 
Netty, otherwise we'll use UnpooledByteBufAllocator.                            
                                                                                
                                                                                
        |
+| <client_type>.rss.client.netty.client.connections.per.peer     | 2       | 
Suppose there are 100 executors, 
spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer 
will establish a total of (100 * 2) connections with multiple clients.          
                                                                                
                                                   |
+| <client_type>.rss.client.netty.client.receive.buffer           | 0       | 
Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and 
send buffer should be latency * network_bandwidth. Assuming latency = 1ms, 
network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the 
operating system automatically estimates the receive buffer size based on 
default settings. |
+| <client_type>.rss.client.netty.client.send.buffer              | 0       | 
Send buffer size (SO_SNDBUF).                                                   
                                                                                
                                                                                
                                                                                
    |

Reply via email to