This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d0c91a74 [#1773] improvement(server): Reduce chunkSize of gRPC 
Netty's PooledByteBufAllocator to reduce memory usage (#1780)
3d0c91a74 is described below

commit 3d0c91a74dad5cc5b5d436b3442d01cb7304986c
Author: RickyMa <[email protected]>
AuthorDate: Thu Jun 13 15:59:22 2024 +0800

    [#1773] improvement(server): Reduce chunkSize of gRPC Netty's 
PooledByteBufAllocator to reduce memory usage (#1780)
    
    ### What changes were proposed in this pull request?
    
    Introduce configs to reduce the chunkSize of the gRPC internal Netty's 
PooledByteBufAllocator to reduce memory usage on the server-side.
    When enabling Netty, the changes for gRPC internal Netty's configs will be 
as follows:
    `io.netty.allocator.pageSize`: 8192 -> 4096 (bytes)
    `io.netty.allocator.maxOrder`: 11 -> 3
    `io.netty.allocator.smallCacheSize`: 128 -> 1024
    Note: `chunkSize` = `pageSize` * 2^`maxOrder`,
    Thus, the `io.netty.allocator.chunkSize`: 2MB -> 32KB
    
    ### Why are the changes needed?
    
    For https://github.com/apache/incubator-uniffle/issues/1773.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This PR will not change anything when using GRPC. It will only take 
effect when using GRPC_NETTY.
    Adds configs: `rss.rpc.netty.pageSize`, `rss.rpc.netty.maxOrder`, 
`rss.rpc.netty.smallCacheSize`
    
    ### How was this patch tested?
    
    Existing tests.
---
 .../apache/uniffle/common/config/RssBaseConf.java  | 24 ++++++++++++++++++++++
 .../org/apache/uniffle/common/rpc/GrpcServer.java  | 19 +++++++++++++++--
 docs/client_guide/client_guide.md                  |  4 ++++
 docs/server_guide.md                               |  3 +++
 4 files changed, 48 insertions(+), 2 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 436f803ed..039d3c831 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -52,6 +52,30 @@ public class RssBaseConf extends RssConf {
           .defaultValue(true)
           .withDescription("If enable metrics for rpc connection");
 
+  public static final ConfigOption<Integer> RPC_NETTY_PAGE_SIZE =
+      ConfigOptions.key("rss.rpc.netty.pageSize")
+          .intType()
+          .defaultValue(4096)
+          .withDescription(
+              "The value of pageSize for PooledByteBufAllocator when using 
gRPC internal Netty on the server-side. "
+                  + "This configuration will only take effect when 
rss.rpc.server.type is set to GRPC_NETTY.");
+
+  public static final ConfigOption<Integer> RPC_NETTY_MAX_ORDER =
+      ConfigOptions.key("rss.rpc.netty.maxOrder")
+          .intType()
+          .defaultValue(3)
+          .withDescription(
+              "The value of maxOrder for PooledByteBufAllocator when using 
gRPC internal Netty on the server-side. "
+                  + "This configuration will only take effect when 
rss.rpc.server.type is set to GRPC_NETTY.");
+
+  public static final ConfigOption<Integer> RPC_NETTY_SMALL_CACHE_SIZE =
+      ConfigOptions.key("rss.rpc.netty.smallCacheSize")
+          .intType()
+          .defaultValue(1024)
+          .withDescription(
+              "The value of smallCacheSize for PooledByteBufAllocator when 
using gRPC internal Netty on the server-side. "
+                  + "This configuration will only take effect when 
rss.rpc.server.type is set to GRPC_NETTY.");
+
   public static final ConfigOption<Integer> JETTY_HTTP_PORT =
       ConfigOptions.key("rss.jetty.http.port")
           .intType()
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index cd7f88952..cf1f05fea 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -35,6 +35,9 @@ import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.ServerInterceptor;
 import io.grpc.ServerInterceptors;
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
+import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator;
+import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
 import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +46,7 @@ import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.common.util.GrpcNettyUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 
@@ -93,10 +97,21 @@ public class GrpcServer implements ServerInterface {
   private Server buildGrpcServer(int serverPort) {
     boolean isMetricsEnabled = 
rssConf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
     long maxInboundMessageSize = 
rssConf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
+    ServerType serverType = rssConf.get(RssBaseConf.RPC_SERVER_TYPE);
+    int pageSize = rssConf.getInteger(RssBaseConf.RPC_NETTY_PAGE_SIZE);
+    int maxOrder = rssConf.getInteger(RssBaseConf.RPC_NETTY_MAX_ORDER);
+    int smallCacheSize = 
rssConf.getInteger(RssBaseConf.RPC_NETTY_SMALL_CACHE_SIZE);
+    PooledByteBufAllocator pooledByteBufAllocator =
+        serverType == ServerType.GRPC
+            ? GrpcNettyUtils.createPooledByteBufAllocator(true, 0, 0, 0, 0)
+            : GrpcNettyUtils.createPooledByteBufAllocatorWithSmallCacheOnly(
+                true, 0, pageSize, maxOrder, smallCacheSize);
     ServerBuilder<?> builder =
-        ServerBuilder.forPort(serverPort)
+        NettyServerBuilder.forPort(serverPort)
             .executor(pool)
-            .maxInboundMessageSize((int) maxInboundMessageSize);
+            .maxInboundMessageSize((int) maxInboundMessageSize)
+            .withOption(ChannelOption.ALLOCATOR, pooledByteBufAllocator)
+            .withChildOption(ChannelOption.ALLOCATOR, pooledByteBufAllocator);
     if (isMetricsEnabled) {
       builder.addTransportFilter(new 
MonitoringServerTransportFilter(grpcMetrics));
     }
diff --git a/docs/client_guide/client_guide.md 
b/docs/client_guide/client_guide.md
index 28e6ca88c..85bb326f3 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -56,6 +56,10 @@ The important configuration of client is listed as 
following. These configuratio
 | <client_type>.rss.client.max.concurrency.of.per-partition.write | -          
                            | The maximum number of files that can be written 
concurrently to a single partition is determined. This value will only be 
respected by the remote shuffle server if it is greater than 0.                 
                                                                                
                                                                                
                      [...]
 | <client_type>.rss.client.rpc.timeout.ms                         | 60000      
                            | Timeout in milliseconds for RPC calls.            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | <client_type>.rss.client.rpc.maxAttempts                        | 3          
                            | When we fail to send RPC calls, we will retry for 
maxAttempts times.                                                              
                                                                                
                                                                                
                                                                                
              [...]
+| <client_type>.rss.client.rpc.netty.pageSize                     | 4096       
                            | The value of pageSize for PooledByteBufAllocator 
when using gRPC internal Netty on the client-side. This configuration will only 
take effect when rss.rpc.server.type is set to GRPC_NETTY.                      
                                                                                
                                                                                
               [...]
+| <client_type>.rss.client.rpc.netty.maxOrder                     | 3          
                            | The value of maxOrder for PooledByteBufAllocator 
when using gRPC internal Netty on the client-side. This configuration will only 
take effect when rss.rpc.server.type is set to GRPC_NETTY.                      
                                                                                
                                                                                
               [...]
+| <client_type>.rss.client.rpc.netty.smallCacheSize               | 1024       
                            | The value of smallCacheSize for 
PooledByteBufAllocator when using gRPC internal Netty on the client-side. This 
configuration will only take effect when rss.rpc.server.type is set to 
GRPC_NETTY.                                                                     
                                                                                
                                          [...]
+
 Notice:
 
 1. `<client_type>` should be `mapreduce` `tez` or `spark`
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 4fecff93b..d2d5e968c 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -74,6 +74,9 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 | rss.coordinator.quorum                                   | -                 
                                                     | Coordinator quorum       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | rss.rpc.server.type                                      | GRPC              
                                                     | Shuffle server type, 
supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using 
GRPC_NETTY to enable Netty on the server side for better stability and 
performance.                                                                    
                                                                                
                              [...]
 | rss.rpc.server.port                                      | 19999             
                                                     | RPC port for Shuffle 
server, if set zero, grpc server start on random port.                          
                                                                                
                                                                                
                                                                                
                  [...]
+| rss.rpc.netty.pageSize                                   | 4096              
                                                     | The value of pageSize 
for PooledByteBufAllocator when using gRPC internal Netty on the server-side. 
This configuration will only take effect when rss.rpc.server.type is set to 
GRPC_NETTY.                                                                     
                                                                                
                       [...]
+| rss.rpc.netty.maxOrder                                   | 3                 
                                                     | The value of maxOrder 
for PooledByteBufAllocator when using gRPC internal Netty on the server-side. 
This configuration will only take effect when rss.rpc.server.type is set to 
GRPC_NETTY.                                                                     
                                                                                
                       [...]
+| rss.rpc.netty.smallCacheSize                             | 1024              
                                                     | The value of 
smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the 
server-side. This configuration will only take effect when rss.rpc.server.type 
is set to GRPC_NETTY.                                                           
                                                                                
                           [...]
 | rss.jetty.http.port                                      | 19998             
                                                     | Http port for Shuffle 
server                                                                          
                                                                                
                                                                                
                                                                                
                 [...]
 | rss.server.netty.port                                    | -1                
                                                     | Netty port for Shuffle 
server, if set zero, Netty server start on random port.                         
                                                                                
                                                                                
                                                                                
                [...]
 | rss.server.netty.epoll.enable                            | false             
                                                     | Whether to enable epoll 
model with Netty server.                                                        
                                                                                
                                                                                
                                                                                
               [...]

Reply via email to