This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a318eb43a [CELEBORN-1815] Support UnpooledByteBufAllocator
a318eb43a is described below

commit a318eb43aba0f2a767f8eb5ca0c3c8c35bcd2da6
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Jan 2 20:54:34 2025 +0800

    [CELEBORN-1815] Support UnpooledByteBufAllocator
    
    ### What changes were proposed in this pull request?
    
    This PR introduces a configuration 
`celeborn.network.memory.allocator.pooled` to allow users to disable 
`PooledByteBufAllocator` globally and always use `UnpooledByteBufAllocator`.
    
    ### Why are the changes needed?
    
    In some extreme cases, the Netty's `PooledByteBufAllocator` might have tons 
of 4MiB chunks but only a few sizes of the capacity are used by the real 
data(see https://github.com/apache/celeborn/pull/3018), for scenarios that 
stability is important than performance, it's desirable to allow users to 
disable the `PooledByteBufAllocator` globally.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Add a new feature, disabled by default.
    
    ### How was this patch tested?
    
    Pass UT to ensure correctness. Performance and memory impact need to be 
verified in the production scale cluster.
    
    Closes #3043 from pan3793/CELEBORN-1815.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../flink/network/FlinkTransportClientFactory.java |   2 +-
 .../network/client/TransportClientFactory.java     |  13 ++-
 .../common/network/server/TransportServer.java     |   6 +-
 .../common/network/util/NettyMemoryMetrics.java    | 126 ++++++++++++---------
 .../celeborn/common/network/util/NettyUtils.java   |  80 +++++++------
 .../org/apache/celeborn/common/CelebornConf.scala  |  14 +++
 .../deploy/worker/memory/MemoryManager.java        |   4 +-
 .../deploy/worker/memory/ReadBufferDispatcher.java |  11 +-
 .../deploy/worker/storage/PartitionDataWriter.java |   8 +-
 .../worker/storage/PartitionFilesSorter.java       |   6 +-
 .../service/deploy/worker/storage/Flusher.scala    |  10 +-
 .../deploy/worker/storage/StorageManager.scala     |   6 +-
 .../storage/PartitionDataWriterSuiteUtils.java     |   3 +-
 .../local/DiskMapPartitionDataWriterSuiteJ.java    |   2 +-
 .../local/DiskReducePartitionDataWriterSuiteJ.java |   4 +-
 .../memory/MemoryPartitionFilesSorterSuiteJ.java   |   6 +-
 .../MemoryReducePartitionDataWriterSuiteJ.java     |   8 +-
 17 files changed, 176 insertions(+), 133 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
index 0bfaaf99e..244d8703e 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
@@ -45,7 +45,7 @@ public class FlinkTransportClientFactory extends 
TransportClientFactory {
       TransportContext context, List<TransportClientBootstrap> bootstraps, int 
bufferSizeBytes) {
     super(context, bootstraps);
     bufferSuppliers = JavaUtils.newConcurrentHashMap();
-    this.pooledAllocator = new UnpooledByteBufAllocator(true);
+    this.allocator = new UnpooledByteBufAllocator(true);
     this.bufferSizeBytes = bufferSizeBytes;
   }
 
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 647ef280b..fac64490b 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -93,7 +93,7 @@ public class TransportClientFactory implements Closeable {
   private final int sendBuf;
   private final Class<? extends Channel> socketChannelClass;
   private EventLoopGroup workerGroup;
-  protected ByteBufAllocator pooledAllocator;
+  protected ByteBufAllocator allocator;
   private final int maxClientConnectRetries;
   private final int maxClientConnectRetryWaitTimeMs;
 
@@ -115,9 +115,12 @@ public class TransportClientFactory implements Closeable {
     logger.info("Module {} mode {} threads {}", conf.getModuleName(), ioMode, 
conf.clientThreads());
     this.workerGroup =
         NettyUtils.createEventLoop(ioMode, conf.clientThreads(), 
conf.getModuleName() + "-client");
-    this.pooledAllocator =
-        NettyUtils.getPooledByteBufAllocator(
-            conf, context.getSource(), false, conf.clientThreads());
+    // Always disable thread-local cache when creating pooled ByteBuf 
allocator for TransportClients
+    // because the ByteBufs are allocated by the event loop thread, but 
released by the executor
+    // thread rather than the event loop thread. Those thread-local caches 
actually delay the
+    // recycling of buffers, leading to larger memory usage.
+    this.allocator =
+        NettyUtils.getByteBufAllocator(conf, context.getSource(), false, 
conf.clientThreads());
     this.maxClientConnectRetries = conf.maxIORetries();
     this.maxClientConnectRetryWaitTimeMs = conf.ioRetryWaitTimeMs();
   }
@@ -268,7 +271,7 @@ public class TransportClientFactory implements Closeable {
         .option(ChannelOption.TCP_NODELAY, true)
         .option(ChannelOption.SO_KEEPALIVE, true)
         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
-        .option(ChannelOption.ALLOCATOR, pooledAllocator);
+        .option(ChannelOption.ALLOCATOR, allocator);
 
     if (receiveBuf > 0) {
       bootstrap.option(ChannelOption.SO_RCVBUF, receiveBuf);
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
index 672a450bc..1808a000c 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
@@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
@@ -91,8 +91,8 @@ public class TransportServer implements Closeable {
     EventLoopGroup workerGroup =
         NettyUtils.createEventLoop(ioMode, conf.serverThreads(), 
conf.getModuleName() + "-server");
 
-    PooledByteBufAllocator allocator =
-        NettyUtils.getPooledByteBufAllocator(conf, source, true, 
conf.serverThreads());
+    ByteBufAllocator allocator =
+        NettyUtils.getByteBufAllocator(conf, source, true, 
conf.serverThreads());
 
     bootstrap =
         new ServerBootstrap()
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
index 440bcd709..59bba341b 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
@@ -23,19 +23,17 @@ import java.util.*;
 
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.annotations.VisibleForTesting;
-import io.netty.buffer.PoolArenaMetric;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocatorMetric;
+import io.netty.buffer.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.metrics.source.AbstractSource;
 
-/** A Netty memory metrics class to collect metrics from Netty 
PooledByteBufAllocator. */
+/** A Netty memory metrics class to collect metrics from Netty 
ByteBufAllocator. */
 public class NettyMemoryMetrics {
   private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
-  private final PooledByteBufAllocator pooledAllocator;
+  private final ByteBufAllocator allocator;
 
   private final boolean verboseMetricsEnabled;
 
@@ -69,71 +67,87 @@ public class NettyMemoryMetrics {
   }
 
   public NettyMemoryMetrics(
-      PooledByteBufAllocator pooledAllocator,
+      ByteBufAllocator allocator,
       String metricPrefix,
       boolean verboseMetricsEnabled,
       AbstractSource source,
       Map<String, String> labels) {
-    this.pooledAllocator = pooledAllocator;
+    this.allocator = allocator;
     this.metricPrefix = metricPrefix;
     this.verboseMetricsEnabled = verboseMetricsEnabled;
     this.source = source;
     this.labels = labels;
 
-    registerMetrics(this.pooledAllocator);
+    registerMetrics();
   }
 
-  private void registerMetrics(PooledByteBufAllocator allocator) {
-    PooledByteBufAllocatorMetric pooledAllocatorMetric = allocator.metric();
-
+  private void registerMetrics() {
     // Register general metrics.
     if (source != null) {
-      logger.debug("setup netty metrics");
-      source.addGauge(
-          MetricRegistry.name(metricPrefix, "usedHeapMemory"),
-          labels,
-          pooledAllocatorMetric::usedHeapMemory);
-      source.addGauge(
-          MetricRegistry.name(metricPrefix, "usedDirectMemory"),
-          labels,
-          pooledAllocatorMetric::usedDirectMemory);
-      source.addGauge(
-          MetricRegistry.name(metricPrefix, "numHeapArenas"),
-          labels,
-          pooledAllocatorMetric::numHeapArenas);
-      source.addGauge(
-          MetricRegistry.name(metricPrefix, "numDirectArenas"),
-          labels,
-          pooledAllocatorMetric::numDirectArenas);
-      source.addGauge(
-          MetricRegistry.name(metricPrefix, "tinyCacheSize"),
-          labels,
-          pooledAllocatorMetric::tinyCacheSize);
-      source.addGauge(
-          MetricRegistry.name(metricPrefix, "smallCacheSize"),
-          labels,
-          pooledAllocatorMetric::smallCacheSize);
-      source.addGauge(
-          MetricRegistry.name(metricPrefix, "normalCacheSize"),
-          labels,
-          pooledAllocatorMetric::normalCacheSize);
-      source.addGauge(
-          MetricRegistry.name(metricPrefix, "numThreadLocalCaches"),
-          labels,
-          pooledAllocatorMetric::numThreadLocalCaches);
-      source.addGauge(
-          MetricRegistry.name(metricPrefix, "chunkSize"), labels, 
pooledAllocatorMetric::chunkSize);
-      if (verboseMetricsEnabled) {
-        int directArenaIndex = 0;
-        for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) {
-          registerArenaMetric(metric, "directArena" + directArenaIndex);
-          directArenaIndex++;
-        }
+      if (allocator instanceof UnpooledByteBufAllocator) {
+        logger.debug("Setup netty metrics for UnpooledByteBufAllocator");
+        ByteBufAllocatorMetric unpooledMetric = ((UnpooledByteBufAllocator) 
allocator).metric();
+        source.addGauge(
+            MetricRegistry.name(metricPrefix, "usedHeapMemory"),
+            labels,
+            unpooledMetric::usedHeapMemory);
+        source.addGauge(
+            MetricRegistry.name(metricPrefix, "usedDirectMemory"),
+            labels,
+            unpooledMetric::usedDirectMemory);
+      } else if (allocator instanceof PooledByteBufAllocator) {
+        logger.debug("Setup netty metrics for PooledByteBufAllocator");
+        PooledByteBufAllocatorMetric pooledAllocatorMetric =
+            ((PooledByteBufAllocator) allocator).metric();
+        source.addGauge(
+            MetricRegistry.name(metricPrefix, "usedHeapMemory"),
+            labels,
+            pooledAllocatorMetric::usedHeapMemory);
+        source.addGauge(
+            MetricRegistry.name(metricPrefix, "usedDirectMemory"),
+            labels,
+            pooledAllocatorMetric::usedDirectMemory);
 
-        int heapArenaIndex = 0;
-        for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) {
-          registerArenaMetric(metric, "heapArena" + heapArenaIndex);
-          heapArenaIndex++;
+        source.addGauge(
+            MetricRegistry.name(metricPrefix, "numHeapArenas"),
+            labels,
+            pooledAllocatorMetric::numHeapArenas);
+        source.addGauge(
+            MetricRegistry.name(metricPrefix, "numDirectArenas"),
+            labels,
+            pooledAllocatorMetric::numDirectArenas);
+        source.addGauge(
+            MetricRegistry.name(metricPrefix, "tinyCacheSize"),
+            labels,
+            pooledAllocatorMetric::tinyCacheSize);
+        source.addGauge(
+            MetricRegistry.name(metricPrefix, "smallCacheSize"),
+            labels,
+            pooledAllocatorMetric::smallCacheSize);
+        source.addGauge(
+            MetricRegistry.name(metricPrefix, "normalCacheSize"),
+            labels,
+            pooledAllocatorMetric::normalCacheSize);
+        source.addGauge(
+            MetricRegistry.name(metricPrefix, "numThreadLocalCaches"),
+            labels,
+            pooledAllocatorMetric::numThreadLocalCaches);
+        source.addGauge(
+            MetricRegistry.name(metricPrefix, "chunkSize"),
+            labels,
+            pooledAllocatorMetric::chunkSize);
+        if (verboseMetricsEnabled) {
+          int directArenaIndex = 0;
+          for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) {
+            registerArenaMetric(metric, "directArena" + directArenaIndex);
+            directArenaIndex++;
+          }
+
+          int heapArenaIndex = 0;
+          for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) {
+            registerArenaMetric(metric, "heapArena" + heapArenaIndex);
+            heapArenaIndex++;
+          }
         }
       }
     }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
index 0a7641247..596b80785 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
@@ -23,7 +23,9 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
 
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.ServerChannel;
@@ -42,9 +44,8 @@ import org.apache.celeborn.common.util.JavaUtils;
 
 /** Utilities for creating various Netty constructs based on whether we're 
using EPOLL or NIO. */
 public class NettyUtils {
-  private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator =
-      new PooledByteBufAllocator[2];
-  private static ConcurrentHashMap<String, Integer> allocatorsIndex =
+  private static final ByteBufAllocator[] _sharedByteBufAllocator = new 
ByteBufAllocator[2];
+  private static final ConcurrentHashMap<String, Integer> allocatorsIndex =
       JavaUtils.newConcurrentHashMap();
   /** Creates a new ThreadFactory which prefixes each thread with the given 
name. */
   public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
@@ -98,58 +99,69 @@ public class NettyUtils {
   }
 
   /**
-   * Create a pooled ByteBuf allocator but disables the thread-local cache. 
Thread-local caches are
-   * disabled for TransportClients because the ByteBufs are allocated by the 
event loop thread, but
-   * released by the executor thread rather than the event loop thread. Those 
thread-local caches
-   * actually delay the recycling of buffers, leading to larger memory usage.
+   * Create a ByteBufAllocator that respects the parameters
+   *
+   * @param pooled If true, create a PooledByteBufAllocator, otherwise 
UnpooledByteBufAllocator
+   * @param allowDirectBufs If true and platform supports, allocate ByteBuf in 
direct memory,
+   *     otherwise in heap memory.
+   * @param allowCache If true, enable thread-local cache, it only take effect 
for
+   *     PooledByteBufAllocator.
+   * @param numCores Number of heap/direct arenas, 0 means use number of cpu 
cores, it only take
+   *     effect for PooledByteBufAllocator.
    */
-  private static PooledByteBufAllocator createPooledByteBufAllocator(
-      boolean allowDirectBufs, boolean allowCache, int numCores) {
-    if (numCores == 0) {
-      numCores = Runtime.getRuntime().availableProcessors();
+  private static ByteBufAllocator createByteBufAllocator(
+      boolean pooled, boolean allowDirectBufs, boolean allowCache, int 
numCores) {
+    if (pooled) {
+      if (numCores == 0) {
+        numCores = Runtime.getRuntime().availableProcessors();
+      }
+      return new PooledByteBufAllocator(
+          allowDirectBufs && PlatformDependent.directBufferPreferred(),
+          Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
+          Math.min(PooledByteBufAllocator.defaultNumDirectArena(), 
allowDirectBufs ? numCores : 0),
+          PooledByteBufAllocator.defaultPageSize(),
+          PooledByteBufAllocator.defaultMaxOrder(),
+          allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
+          allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
+          allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads());
+    } else {
+      return new UnpooledByteBufAllocator(
+          allowDirectBufs && PlatformDependent.directBufferPreferred());
     }
-    return new PooledByteBufAllocator(
-        allowDirectBufs && PlatformDependent.directBufferPreferred(),
-        Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
-        Math.min(PooledByteBufAllocator.defaultNumDirectArena(), 
allowDirectBufs ? numCores : 0),
-        PooledByteBufAllocator.defaultPageSize(),
-        PooledByteBufAllocator.defaultMaxOrder(),
-        allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
-        allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
-        allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads());
   }
 
   /**
    * Returns the lazily created shared pooled ByteBuf allocator for the 
specified allowCache
    * parameter value.
    */
-  public static synchronized PooledByteBufAllocator 
getSharedPooledByteBufAllocator(
+  public static synchronized ByteBufAllocator getSharedByteBufAllocator(
       CelebornConf conf, AbstractSource source, boolean allowCache) {
     final int index = allowCache ? 0 : 1;
-    if (_sharedPooledByteBufAllocator[index] == null) {
-      _sharedPooledByteBufAllocator[index] =
-          createPooledByteBufAllocator(true, allowCache, 
conf.networkAllocatorArenas());
+    if (_sharedByteBufAllocator[index] == null) {
+      _sharedByteBufAllocator[index] =
+          createByteBufAllocator(
+              conf.networkMemoryAllocatorPooled(), true, allowCache, 
conf.networkAllocatorArenas());
       if (source != null) {
         new NettyMemoryMetrics(
-            _sharedPooledByteBufAllocator[index],
+            _sharedByteBufAllocator[index],
             "shared-pool-" + index,
             conf.networkAllocatorVerboseMetric(),
             source,
             Collections.emptyMap());
       }
     }
-    return _sharedPooledByteBufAllocator[index];
+    return _sharedByteBufAllocator[index];
   }
 
-  public static PooledByteBufAllocator getPooledByteBufAllocator(
+  public static ByteBufAllocator getByteBufAllocator(
       TransportConf conf, AbstractSource source, boolean allowCache) {
-    return getPooledByteBufAllocator(conf, source, allowCache, 0);
+    return getByteBufAllocator(conf, source, allowCache, 0);
   }
 
-  public static PooledByteBufAllocator getPooledByteBufAllocator(
+  public static ByteBufAllocator getByteBufAllocator(
       TransportConf conf, AbstractSource source, boolean allowCache, int 
coreNum) {
     if (conf.getCelebornConf().networkShareMemoryAllocator()) {
-      return getSharedPooledByteBufAllocator(
+      return getSharedByteBufAllocator(
           conf.getCelebornConf(),
           source,
           allowCache && 
conf.getCelebornConf().networkMemoryAllocatorAllowCache());
@@ -160,8 +172,12 @@ public class NettyUtils {
     } else {
       arenas = conf.getCelebornConf().networkAllocatorArenas();
     }
-    PooledByteBufAllocator allocator =
-        createPooledByteBufAllocator(conf.preferDirectBufs(), allowCache, 
arenas);
+    ByteBufAllocator allocator =
+        createByteBufAllocator(
+            conf.getCelebornConf().networkMemoryAllocatorPooled(),
+            conf.preferDirectBufs(),
+            allowCache,
+            arenas);
     if (source != null) {
       String poolName = "default-netty-pool";
       Map<String, String> labels = new HashMap<>();
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a8cdf8b7d..7280a1cde 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -603,6 +603,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def networkMemoryAllocatorAllowCache: Boolean =
     get(NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE)
 
+  def networkMemoryAllocatorPooled: Boolean =
+    get(NETWORK_MEMORY_ALLOCATOR_POOLED)
+
   def networkAllocatorArenas: Int = 
get(NETWORK_MEMORY_ALLOCATOR_ARENAS).getOrElse(Math.max(
     Runtime.getRuntime.availableProcessors(),
     2))
@@ -1782,6 +1785,17 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val NETWORK_MEMORY_ALLOCATOR_POOLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.network.memory.allocator.pooled")
+      .categories("network")
+      .internal
+      .version("0.6.0")
+      .doc("If disabled, always use UnpooledByteBufAllocator for aggressive 
memory reclamation, " +
+        "this is helpful for cases that worker has high memory usage even 
after triming. " +
+        "Disabling would cause performace degression and higher CPU usage.")
+      .booleanConf
+      .createWithDefault(true)
+
   val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] =
     buildConf("celeborn.network.memory.allocator.share")
       .categories("network")
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index db368aa21..32704cf04 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.LongAdder;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.internal.PlatformDependent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -546,7 +546,7 @@ public class MemoryManager {
     readBufferDispatcher.close();
   }
 
-  public PooledByteBufAllocator getStoragePooledByteBufAllocator() {
+  public ByteBufAllocator getStorageByteBufAllocator() {
     return storageManager.storageBufferAllocator();
   }
 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index 1646095ec..5c93435c9 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.LongAdder;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ public class ReadBufferDispatcher {
   private final Logger logger = 
LoggerFactory.getLogger(ReadBufferDispatcher.class);
   private final LinkedBlockingQueue<ReadBufferRequest> requests = new 
LinkedBlockingQueue<>();
   private final MemoryManager memoryManager;
-  private final PooledByteBufAllocator readBufferAllocator;
+  private final ByteBufAllocator readBufferAllocator;
   private final LongAdder allocatedReadBuffers = new LongAdder();
   private final long readBufferAllocationWait;
   @VisibleForTesting public volatile boolean stopFlag = false;
@@ -51,7 +52,7 @@ public class ReadBufferDispatcher {
     long checkThreadInterval = conf.readBufferDispatcherCheckThreadInterval();
     // readBuffer is not a module name, it's a placeholder.
     readBufferAllocator =
-        NettyUtils.getPooledByteBufAllocator(new TransportConf("readBuffer", 
conf), null, true);
+        NettyUtils.getByteBufAllocator(new TransportConf("readBuffer", conf), 
null, true);
     this.memoryManager = memoryManager;
     dispatcherThread =
         new AtomicReference<>(
@@ -118,8 +119,10 @@ public class ReadBufferDispatcher {
             if (request != null) {
               processBufferRequest(request, buffers);
             } else {
-              // Free buffer pool memory to main direct memory when dispatcher 
is idle.
-              readBufferAllocator.trimCurrentThreadCache();
+              if (readBufferAllocator instanceof PooledByteBufAllocator) {
+                // Free buffer pool memory to main direct memory when 
dispatcher is idle.
+                ((PooledByteBufAllocator) 
readBufferAllocator).trimCurrentThreadCache();
+              }
             }
           } catch (Throwable e) {
             logger.error(e.getMessage(), e);
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 185eeb950..c711752fb 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -31,8 +31,8 @@ import scala.Tuple4;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.roaringbitmap.RoaringBitmap;
@@ -102,7 +102,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
   protected final long memoryFileStorageMaxFileSize;
   protected AtomicBoolean isMemoryShuffleFile = new AtomicBoolean();
   protected final String filename;
-  protected PooledByteBufAllocator pooledByteBufAllocator;
+  protected ByteBufAllocator allocator;
   private final PartitionDataWriterContext writerContext;
   private final long localFlusherBufferSize;
   private final long hdfsFlusherBufferSize;
@@ -155,7 +155,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     // Reduce partition data writers support memory storage now
     if (supportInMemory && createFileResult._1() != null) {
       this.memoryFileInfo = createFileResult._1();
-      this.pooledByteBufAllocator = storageManager.storageBufferAllocator();
+      this.allocator = storageManager.storageBufferAllocator();
       this.isMemoryShuffleFile.set(true);
       storageManager.registerMemoryPartitionWriter(this, 
createFileResult._1());
     } else if (createFileResult._2() != null) {
@@ -669,7 +669,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
         flushBuffer = flusher.takeBuffer();
       } else {
         if (flushBuffer == null) {
-          flushBuffer = 
pooledByteBufAllocator.compositeBuffer(Integer.MAX_VALUE);
+          flushBuffer = allocator.compositeBuffer(Integer.MAX_VALUE);
         }
       }
     }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 2cc6c3292..dfc2f3e99 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -211,9 +211,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
                   startMapIndex, endMapIndex, shuffleChunkSize, indexesMap, 
true),
               shuffleChunkSize);
       CompositeByteBuf targetBuffer =
-          MemoryManager.instance()
-              .getStoragePooledByteBufAllocator()
-              .compositeBuffer(Integer.MAX_VALUE);
+          
MemoryManager.instance().getStorageByteBufAllocator().compositeBuffer(Integer.MAX_VALUE);
       ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
           startMapIndex,
           endMapIndex,
@@ -334,7 +332,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
         // because this will affect origin buffer's reference count
         CompositeByteBuf sortedBuffer =
             MemoryManager.instance()
-                .getStoragePooledByteBufAllocator()
+                .getStorageByteBufAllocator()
                 .compositeBuffer(Integer.MAX_VALUE - 1);
         Map<Integer, List<ShuffleBlockInfo>> sortedBlocks = new TreeMap<>();
         int sortedBufferIndex = 0;
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 205ce8bcc..a10059436 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, 
AtomicLongArray}
 
 import scala.util.Random
 
-import io.netty.buffer.{CompositeByteBuf, PooledByteBufAllocator}
+import io.netty.buffer.{ByteBufAllocator, CompositeByteBuf, 
PooledByteBufAllocator}
 
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{DiskStatus, TimeWindow}
@@ -38,7 +38,7 @@ import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager
 abstract private[worker] class Flusher(
     val workerSource: AbstractSource,
     val threadCount: Int,
-    val allocator: PooledByteBufAllocator,
+    val allocator: ByteBufAllocator,
     val maxComponents: Int,
     flushTimeMetric: TimeWindow,
     mountPoint: String) extends Logging {
@@ -137,7 +137,7 @@ private[worker] class LocalFlusher(
     workerSource: AbstractSource,
     val deviceMonitor: DeviceMonitor,
     threadCount: Int,
-    allocator: PooledByteBufAllocator,
+    allocator: ByteBufAllocator,
     maxComponents: Int,
     val mountPoint: String,
     val diskType: StorageInfo.Type,
@@ -176,7 +176,7 @@ private[worker] class LocalFlusher(
 final private[worker] class HdfsFlusher(
     workerSource: AbstractSource,
     hdfsFlusherThreads: Int,
-    allocator: PooledByteBufAllocator,
+    allocator: ByteBufAllocator,
     maxComponents: Int) extends Flusher(
     workerSource,
     hdfsFlusherThreads,
@@ -195,7 +195,7 @@ final private[worker] class HdfsFlusher(
 final private[worker] class S3Flusher(
     workerSource: AbstractSource,
     s3FlusherThreads: Int,
-    allocator: PooledByteBufAllocator,
+    allocator: ByteBufAllocator,
     maxComponents: Int) extends Flusher(
     workerSource,
     s3FlusherThreads,
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index c2f1c66a1..a1f551bec 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -29,7 +29,7 @@ import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 
 import com.google.common.annotations.VisibleForTesting
-import io.netty.buffer.PooledByteBufAllocator
+import io.netty.buffer.{ByteBufAllocator, PooledByteBufAllocator}
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.fs.permission.FsPermission
@@ -131,8 +131,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   private val deviceMonitor =
     DeviceMonitor.createDeviceMonitor(conf, this, deviceInfos, tmpDiskInfos, 
workerSource)
 
-  val storageBufferAllocator: PooledByteBufAllocator =
-    NettyUtils.getPooledByteBufAllocator(new TransportConf("StorageManager", 
conf), null, true)
+  val storageBufferAllocator: ByteBufAllocator =
+    NettyUtils.getByteBufAllocator(new TransportConf("StorageManager", conf), 
null, true)
 
   // (mountPoint -> LocalFlusher)
   private val (
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
index e24d98e3b..293be2126 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
@@ -124,8 +124,7 @@ public class PartitionDataWriterSuiteUtils {
             source,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
             1,
-            NettyUtils.getPooledByteBufAllocator(
-                new TransportConf("test", celebornConf), null, true),
+            NettyUtils.getByteBufAllocator(new TransportConf("test", 
celebornConf), null, true),
             256,
             "disk1",
             StorageInfo.Type.HDD,
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
index f3fb4030d..2561bd460 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
@@ -88,7 +88,7 @@ public class DiskMapPartitionDataWriterSuiteJ {
             source,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
             1,
-            NettyUtils.getPooledByteBufAllocator(new TransportConf("test", 
CONF), null, true),
+            NettyUtils.getByteBufAllocator(new TransportConf("test", CONF), 
null, true),
             256,
             "disk1",
             StorageInfo.Type.HDD,
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
index 748cd9d83..28abdf184 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
@@ -113,7 +113,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
             1,
-            NettyUtils.getPooledByteBufAllocator(new TransportConf("test", 
CONF), null, true),
+            NettyUtils.getByteBufAllocator(new TransportConf("test", CONF), 
null, true),
             256,
             "disk1",
             StorageInfo.Type.HDD,
@@ -424,7 +424,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
             1,
-            NettyUtils.getPooledByteBufAllocator(new TransportConf("test", 
CONF), null, true),
+            NettyUtils.getByteBufAllocator(new TransportConf("test", CONF), 
null, true),
             256,
             "disk2",
             StorageInfo.Type.HDD,
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java
index 1d39839b0..9de0287ea 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java
@@ -25,8 +25,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -60,8 +60,8 @@ public class MemoryPartitionFilesSorterSuiteJ {
     fileInfo = new MemoryFileInfo(userIdentifier, true, new ReduceFileMeta(8 * 
1024 * 1024));
 
     AbstractSource source = Mockito.mock(AbstractSource.class);
-    PooledByteBufAllocator allocator =
-        NettyUtils.getSharedPooledByteBufAllocator(new CelebornConf(), source, 
false);
+    ByteBufAllocator allocator =
+        NettyUtils.getSharedByteBufAllocator(new CelebornConf(), source, 
false);
     CompositeByteBuf buffer = allocator.compositeBuffer();
     fileInfo.setBuffer(buffer);
 
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index 479d19676..677989c73 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -30,10 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import scala.Function0;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
+import io.netty.buffer.*;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -105,8 +102,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
     conf.set(CelebornConf.WORKER_DIRECT_MEMORY_CHECK_INTERVAL().key(), "10");
     conf.set(CelebornConf.WORKER_DIRECT_MEMORY_REPORT_INTERVAL().key(), "10");
     conf.set(CelebornConf.WORKER_READBUFFER_ALLOCATIONWAIT().key(), "10ms");
-    PooledByteBufAllocator allocator =
-        NettyUtils.getPooledByteBufAllocator(transConf, source, false);
+    ByteBufAllocator allocator = NettyUtils.getByteBufAllocator(transConf, 
source, false);
     storageManager = Mockito.mock(StorageManager.class);
     AtomicLong evictCount = new AtomicLong();
     Mockito.when(storageManager.evictedFileCount()).thenAnswer(a -> 
evictCount);


Reply via email to