This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new e3ec90f5c [#1802] improvement(netty): Allow sharing Netty's memory 
allocators (#1803)
e3ec90f5c is described below

commit e3ec90f5ce45fbc5c1ef07ce1e8889ddb0c9a9a9
Author: RickyMa <[email protected]>
AuthorDate: Tue Jun 18 21:03:30 2024 +0800

    [#1802] improvement(netty): Allow sharing Netty's memory allocators (#1803)
    
    ### What changes were proposed in this pull request?
    
    Introducing shared ByteBuf allocators.
    This feature can be enabled via the 
`rss.client.netty.client.shared.allocator.enabled` configuration.
    
    If enabled then only three ByteBuf allocators are created:
    one PooledByteBufAllocator where caching is allowed, one 
PooledByteBufAllocator where not and one UnpooledByteBufAllocator When 
disabled, a new allocator is created for each transport client.
    
    We set the default value to true to reduce memory usage, refer to 
https://github.com/apache/spark/pull/23278.
    
    ### Why are the changes needed?
    
    For: https://github.com/apache/incubator-uniffle/issues/1802.
    
    ### Does this PR introduce _any_ user-facing change?
    
    The `rss.client.netty.client.shared.allocator.enabled` configuration is 
introduced.
    
    ### How was this patch tested?
    
    Existing UTs.
---
 .../uniffle/common/config/RssClientConf.java       | 10 +++
 .../netty/client/TransportClientFactory.java       | 30 ++++++---
 .../uniffle/common/netty/client/TransportConf.java |  4 ++
 .../uniffle/common/netty/protocol/Decoders.java    |  2 +-
 .../apache/uniffle/common/util/GrpcNettyUtils.java |  4 +-
 .../org/apache/uniffle/common/util/NettyUtils.java | 71 ++++++++++++++++++----
 docs/client_guide/client_guide.md                  | 23 +++----
 .../uniffle/client/impl/grpc/GrpcClient.java       |  4 +-
 .../impl/grpc/ShuffleServerGrpcNettyClient.java    |  4 +-
 .../server/buffer/AbstractShuffleBuffer.java       |  2 +-
 .../apache/uniffle/server/netty/StreamServer.java  |  4 +-
 11 files changed, 117 insertions(+), 41 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 21350c6fd..570ea50c0 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -170,6 +170,16 @@ public class RssClientConf {
           .withDescription(
               "If true, we will use PooledByteBufAllocator to allocate byte 
buffers within Netty, otherwise we'll use UnpooledByteBufAllocator.");
 
+  public static final ConfigOption<Boolean> 
NETTY_CLIENT_SHARED_ALLOCATOR_ENABLED =
+      ConfigOptions.key("rss.client.netty.client.shared.allocator.enabled")
+          .booleanType()
+          .defaultValue(true)
+          .withDescription(
+              "A flag indicating whether to share the ByteBuf allocators 
between the different Netty channels when enabling Netty. "
+                  + "If enabled then only three ByteBuf allocators are 
created: "
+                  + "one PooledByteBufAllocator where caching is allowed, one 
PooledByteBufAllocator where not and one UnpooledByteBufAllocator. "
+                  + "When disabled, a new allocator is created for each 
transport client.");
+
   public static final ConfigOption<Integer> 
NETTY_CLIENT_NUM_CONNECTIONS_PER_PEER =
       ConfigOptions.key("rss.client.netty.client.connections.per.peer")
           .intType()
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
index 6a3997023..4108b486b 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.AbstractByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -73,7 +73,7 @@ public class TransportClientFactory implements Closeable {
 
   private final Class<? extends Channel> socketChannelClass;
   private EventLoopGroup workerGroup;
-  private AbstractByteBufAllocator byteBufAllocator;
+  private ByteBufAllocator byteBufAllocator;
 
   public TransportClientFactory(TransportContext context) {
     this.context = Objects.requireNonNull(context);
@@ -85,11 +85,27 @@ public class TransportClientFactory implements Closeable {
     IOMode ioMode = conf.ioMode();
     this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
     this.workerGroup = NettyUtils.createEventLoop(ioMode, 
conf.clientThreads(), "netty-rpc-client");
-    this.byteBufAllocator =
-        conf.isPooledAllocatorEnabled()
-            ? NettyUtils.createPooledByteBufAllocator(
-                conf.preferDirectBufs(), false, conf.clientThreads())
-            : 
NettyUtils.createUnpooledByteBufAllocator(conf.preferDirectBufs());
+    if (conf.isSharedAllocatorEnabled()) {
+      this.byteBufAllocator =
+          conf.isPooledAllocatorEnabled()
+              ? NettyUtils.getSharedPooledByteBufAllocator(
+                  conf.preferDirectBufs(), false, conf.clientThreads())
+              : 
NettyUtils.getSharedUnpooledByteBufAllocator(conf.preferDirectBufs());
+    } else {
+      this.byteBufAllocator =
+          conf.isPooledAllocatorEnabled()
+              ? NettyUtils.createPooledByteBufAllocator(
+                  conf.preferDirectBufs(), false, conf.clientThreads())
+              : 
NettyUtils.createUnpooledByteBufAllocator(conf.preferDirectBufs());
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "isPooledAllocatorEnabled={}, isSharedAllocatorEnabled={}, 
preferDirectBufs={}, byteBufAllocator={}",
+          conf.isPooledAllocatorEnabled(),
+          conf.isSharedAllocatorEnabled(),
+          conf.preferDirectBufs(),
+          byteBufAllocator);
+    }
   }
 
   public TransportClient createClient(String remoteHost, int remotePort, int 
partitionId)
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
index 684645080..64a6e912d 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
@@ -57,6 +57,10 @@ public class TransportConf {
     return rssConf.get(RssClientConf.NETTY_CLIENT_POOLED_ALLOCATOR_ENABLED);
   }
 
+  public boolean isSharedAllocatorEnabled() {
+    return rssConf.get(RssClientConf.NETTY_CLIENT_SHARED_ALLOCATOR_ENABLED);
+  }
+
   public int receiveBuf() {
     return rssConf.get(RssClientConf.NETTY_CLIENT_RECEIVE_BUFFER);
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java
index b8c687ce7..fc7004880 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java
@@ -47,7 +47,7 @@ public class Decoders {
     long crc = byteBuf.readLong();
     long taskAttemptId = byteBuf.readLong();
     int dataLength = byteBuf.readInt();
-    ByteBuf data = 
NettyUtils.getNettyBufferAllocator().directBuffer(dataLength);
+    ByteBuf data = 
NettyUtils.getSharedUnpooledByteBufAllocator(true).directBuffer(dataLength);
     data.writeBytes(byteBuf, dataLength);
     int lengthOfShuffleServers = byteBuf.readInt();
     List<ShuffleServerInfo> serverInfos = Lists.newArrayList();
diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/GrpcNettyUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/GrpcNettyUtils.java
index 8408001b9..93a7949d7 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/GrpcNettyUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/GrpcNettyUtils.java
@@ -35,9 +35,7 @@ public class GrpcNettyUtils {
       int maxOrder,
       int smallCacheSize,
       int normalCacheSize) {
-    if (numCores == 0) {
-      numCores = Runtime.getRuntime().availableProcessors();
-    }
+    numCores = NettyUtils.defaultNumThreads(numCores);
     if (pageSize == 0) {
       pageSize = PooledByteBufAllocator.defaultPageSize();
     }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java
index 468d2a7a3..59668b3b1 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java
@@ -18,8 +18,8 @@
 package org.apache.uniffle.common.util;
 
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 
-import io.netty.buffer.AbstractByteBufAllocator;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
@@ -43,6 +43,13 @@ public class NettyUtils {
 
   private static final long MAX_DIRECT_MEMORY_IN_BYTES = 
PlatformDependent.maxDirectMemory();
 
+  /** Specifies an upper bound on the number of Netty threads that Uniffle 
requires by default. */
+  private static int MAX_DEFAULT_NETTY_THREADS = 8;
+
+  private static final AtomicReferenceArray<PooledByteBufAllocator>
+      SHARED_POOLED_BYTE_BUF_ALLOCATOR = new AtomicReferenceArray<>(2);
+  private static volatile UnpooledByteBufAllocator 
sharedUnpooledByteBufAllocator;
+
   /** Creates a Netty EventLoopGroup based on the IOMode. */
   public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, 
String threadPrefix) {
     ThreadFactory threadFactory = 
ThreadUtils.getNettyThreadFactory(threadPrefix);
@@ -69,11 +76,59 @@ public class NettyUtils {
     }
   }
 
-  public static PooledByteBufAllocator createPooledByteBufAllocator(
+  /**
+   * Returns the default number of threads for both the Netty client and 
server thread pools. If
+   * numUsableCores is 0, we will use Runtime get an approximate number of 
available cores.
+   */
+  public static int defaultNumThreads(int numUsableCores) {
+    final int availableCores;
+    if (numUsableCores > 0) {
+      availableCores = numUsableCores;
+    } else {
+      availableCores = Runtime.getRuntime().availableProcessors();
+    }
+    return Math.min(availableCores, MAX_DEFAULT_NETTY_THREADS);
+  }
+
+  /**
+   * Returns the lazily created shared pooled ByteBuf allocator for the 
specified allowCache
+   * parameter value.
+   */
+  public static PooledByteBufAllocator getSharedPooledByteBufAllocator(
       boolean allowDirectBufs, boolean allowCache, int numCores) {
-    if (numCores == 0) {
-      numCores = Runtime.getRuntime().availableProcessors();
+    final int index = allowCache ? 0 : 1;
+    PooledByteBufAllocator allocator = 
SHARED_POOLED_BYTE_BUF_ALLOCATOR.get(index);
+    if (allocator == null) {
+      synchronized (NettyUtils.class) {
+        allocator = SHARED_POOLED_BYTE_BUF_ALLOCATOR.get(index);
+        if (allocator == null) {
+          allocator = createPooledByteBufAllocator(allowDirectBufs, 
allowCache, numCores);
+          SHARED_POOLED_BYTE_BUF_ALLOCATOR.set(index, allocator);
+        }
+      }
     }
+    return allocator;
+  }
+
+  /**
+   * Returns the lazily created shared un-pooled ByteBuf allocator for the 
specified allowCache
+   * parameter value.
+   */
+  public static synchronized UnpooledByteBufAllocator 
getSharedUnpooledByteBufAllocator(
+      boolean allowDirectBufs) {
+    if (sharedUnpooledByteBufAllocator == null) {
+      synchronized (NettyUtils.class) {
+        if (sharedUnpooledByteBufAllocator == null) {
+          sharedUnpooledByteBufAllocator = 
createUnpooledByteBufAllocator(allowDirectBufs);
+        }
+      }
+    }
+    return sharedUnpooledByteBufAllocator;
+  }
+
+  public static PooledByteBufAllocator createPooledByteBufAllocator(
+      boolean allowDirectBufs, boolean allowCache, int numCores) {
+    numCores = defaultNumThreads(numCores);
     return new PooledByteBufAllocator(
         allowDirectBufs && PlatformDependent.directBufferPreferred(),
         Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
@@ -117,14 +172,6 @@ public class NettyUtils {
     return String.format("[%s -> %s]", channel.localAddress(), 
channel.remoteAddress());
   }
 
-  private static class AllocatorHolder {
-    private static final AbstractByteBufAllocator INSTANCE = 
createUnpooledByteBufAllocator(true);
-  }
-
-  public static AbstractByteBufAllocator getNettyBufferAllocator() {
-    return AllocatorHolder.INSTANCE;
-  }
-
   public static UnpooledByteBufAllocator 
createUnpooledByteBufAllocator(boolean preferDirect) {
     return new UnpooledByteBufAllocator(preferDirect);
   }
diff --git a/docs/client_guide/client_guide.md 
b/docs/client_guide/client_guide.md
index 85bb326f3..b33aebc1b 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -100,14 +100,15 @@ spark.rss.data.replica.read 2
 ```
 
 ### Netty Setting
-| Property Name                                                  | Default | 
Description                                                                     
                                                                                
                                                                                
                                                                                
    |
-|----------------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| <client_type>.rss.client.type                                  | GRPC    | 
Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using 
GRPC_NETTY to enable Netty on the client side for better stability and 
performance.                                                                    
                                                                                
                |
-| <client_type>.rss.client.netty.io.mode                         | NIO     | 
Netty EventLoopGroup backend, available options: NIO, EPOLL.                    
                                                                                
                                                                                
                                                                                
    |
-| <client_type>.rss.client.netty.client.connection.timeout.ms    | 600000  | 
Connection active timeout.                                                      
                                                                                
                                                                                
                                                                                
    |
-| <client_type>.rss.client.netty.client.threads                  | 0       | 
Number of threads used in the client thread pool. Default is 0, Netty will use 
the number of (available logical cores * 2) as the number of threads.           
                                                                                
                                                                                
     |
-| <client_type>.rss.client.netty.client.prefer.direct.bufs       | true    | 
If true, we will prefer allocating off-heap byte buffers within Netty.          
                                                                                
                                                                                
                                                                                
    |
-| <client_type>.rss.client.netty.client.pooled.allocator.enabled | true    | 
If true, we will use PooledByteBufAllocator to allocate byte buffers within 
Netty, otherwise we'll use UnpooledByteBufAllocator.                            
                                                                                
                                                                                
        |
-| <client_type>.rss.client.netty.client.connections.per.peer     | 2       | 
Suppose there are 100 executors, 
spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer 
will establish a total of (100 * 2) connections with multiple clients.          
                                                                                
                                                   |
-| <client_type>.rss.client.netty.client.receive.buffer           | 0       | 
Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and 
send buffer should be latency * network_bandwidth. Assuming latency = 1ms, 
network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the 
operating system automatically estimates the receive buffer size based on 
default settings. |
-| <client_type>.rss.client.netty.client.send.buffer              | 0       | 
Send buffer size (SO_SNDBUF).                                                   
                                                                                
                                                                                
                                                                                
    |
+| Property Name                                                  | Default | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                                                |
+|----------------------------------------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| <client_type>.rss.client.type                                  | GRPC    | 
Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using 
GRPC_NETTY to enable Netty on the client side for better stability and 
performance.                                                                    
                                                                                
                                                            |
+| <client_type>.rss.client.netty.io.mode                         | NIO     | 
Netty EventLoopGroup backend, available options: NIO, EPOLL.                    
                                                                                
                                                                                
                                                                                
                                                |
+| <client_type>.rss.client.netty.client.connection.timeout.ms    | 600000  | 
Connection active timeout.                                                      
                                                                                
                                                                                
                                                                                
                                                |
+| <client_type>.rss.client.netty.client.threads                  | 0       | 
Number of threads used in the client thread pool. Default is 0, Netty will use 
the number of (available logical cores * 2) as the number of threads.           
                                                                                
                                                                                
                                                 |
+| <client_type>.rss.client.netty.client.prefer.direct.bufs       | true    | 
If true, we will prefer allocating off-heap byte buffers within Netty.          
                                                                                
                                                                                
                                                                                
                                                |
+| <client_type>.rss.client.netty.client.pooled.allocator.enabled | true    | 
If true, we will use PooledByteBufAllocator to allocate byte buffers within 
Netty, otherwise we'll use UnpooledByteBufAllocator.                            
                                                                                
                                                                                
                                                    |
+| <client_type>.rss.client.netty.client.shared.allocator.enabled | true    | A 
flag indicating whether to share the ByteBuf allocators between the different 
Netty channels when enabling Netty. If enabled then only three ByteBuf 
allocators are created: one PooledByteBufAllocator where caching is allowed, 
one PooledByteBufAllocator where not and one UnpooledByteBufAllocator. When 
disabled, a new allocator is created for each transport client. |
+| <client_type>.rss.client.netty.client.connections.per.peer     | 2       | 
Suppose there are 100 executors, 
spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer 
will establish a total of (100 * 2) connections with multiple clients.          
                                                                                
                                                                                
               |
+| <client_type>.rss.client.netty.client.receive.buffer           | 0       | 
Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and 
send buffer should be latency * network_bandwidth. Assuming latency = 1ms, 
network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the 
operating system automatically estimates the receive buffer size based on 
default settings.                                             |
+| <client_type>.rss.client.netty.client.send.buffer              | 0       | 
Send buffer size (SO_SNDBUF).                                                   
                                                                                
                                                                                
                                                                                
                                                |
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
index c3bb5f883..3539b9e90 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
 
 import io.grpc.ManagedChannel;
 import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
-import io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator;
+import io.grpc.netty.shaded.io.netty.buffer.ByteBufAllocator;
 import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,7 +76,7 @@ public abstract class GrpcClient {
     this.channel = channel;
   }
 
-  protected AbstractByteBufAllocator createByteBufAllocator(
+  protected ByteBufAllocator createByteBufAllocator(
       int pageSize, int maxOrder, int smallCacheSize) {
     return GrpcNettyUtils.createPooledByteBufAllocator(true, 0, 0, 0, 0);
   }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 1c46077ad..806fd3c21 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
-import io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator;
+import io.grpc.netty.shaded.io.netty.buffer.ByteBufAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,7 +108,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
   }
 
   @Override
-  protected AbstractByteBufAllocator createByteBufAllocator(
+  protected ByteBufAllocator createByteBufAllocator(
       int pageSize, int maxOrder, int smallCacheSize) {
     LOG.info(
         "ShuffleServerGrpcNettyClient is initialized - host:{}, gRPC port:{}, 
netty port:{}, maxRetryAttempts:{}, usePlaintext:{}, pageSize:{}, maxOrder:{}, 
smallCacheSize={}",
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index 15197bc86..fd5a62280 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -93,7 +93,7 @@ public abstract class AbstractShuffleBuffer implements 
ShuffleBuffer {
       if (!bufferSegments.isEmpty()) {
         CompositeByteBuf byteBuf =
             new CompositeByteBuf(
-                NettyUtils.getNettyBufferAllocator(),
+                NettyUtils.getSharedUnpooledByteBufAllocator(true),
                 true,
                 Constants.COMPOSITE_BYTE_BUF_MAX_COMPONENTS);
         // copy result data
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java 
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
index f8410fcc9..23ccaff38 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
@@ -155,9 +155,9 @@ public class StreamServer implements ServerInterface {
             })
         .option(ChannelOption.SO_BACKLOG, backlogSize)
         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
-        .option(ChannelOption.ALLOCATOR, NettyUtils.getNettyBufferAllocator())
+        .option(ChannelOption.ALLOCATOR, 
NettyUtils.getSharedUnpooledByteBufAllocator(true))
         .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
-        .childOption(ChannelOption.ALLOCATOR, 
NettyUtils.getNettyBufferAllocator())
+        .childOption(ChannelOption.ALLOCATOR, 
NettyUtils.getSharedUnpooledByteBufAllocator(true))
         .childOption(ChannelOption.TCP_NODELAY, true)
         .childOption(ChannelOption.SO_KEEPALIVE, true);
 

Reply via email to