This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f1bb4164 [CELEBORN-796] Support for globally disable thread-local 
cache in the shared PooledByteBufAllocator
6f1bb4164 is described below

commit 6f1bb41646441698401172a5d6f55becf7ada032
Author: Fu Chen <[email protected]>
AuthorDate: Fri Aug 11 21:49:09 2023 +0800

    [CELEBORN-796] Support for globally disable thread-local cache in the 
shared PooledByteBufAllocator
    
    ### What changes were proposed in this pull request?
    
    As title
    
    ### Why are the changes needed?
    
    As title
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the thread local cache of shared `PooledByteBufAllocator` can be 
disabled by setting `celeborn.network.memory.allocator.allowCache=false`
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #1716 from cfmcgrady/allow-cache.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../celeborn/common/network/util/NettyUtils.java   | 41 +++++++++++++---------
 .../org/apache/celeborn/common/CelebornConf.scala  | 12 +++++++
 .../deploy/worker/memory/ChannelsLimiter.java      |  6 +++-
 3 files changed, 41 insertions(+), 18 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
index 69951e187..4c941e864 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
@@ -42,7 +42,8 @@ import org.apache.celeborn.common.util.JavaUtils;
 
 /** Utilities for creating various Netty constructs based on whether we're 
using EPOLL or NIO. */
 public class NettyUtils {
-  private static volatile PooledByteBufAllocator _allocator;
+  private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator =
+      new PooledByteBufAllocator[2];
   private static ConcurrentHashMap<String, Integer> allocatorsIndex =
       JavaUtils.newConcurrentHashMap();
   /** Creates a new ThreadFactory which prefixes each thread with the given 
name. */
@@ -118,29 +119,35 @@ public class NettyUtils {
         allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads());
   }
 
-  private static PooledByteBufAllocator getSharedPooledByteBufAllocator(
-      CelebornConf conf, AbstractSource source) {
-    synchronized (PooledByteBufAllocator.class) {
-      if (_allocator == null) {
-        // each core should have one arena to allocate memory
-        _allocator = createPooledByteBufAllocator(true, true, 
conf.networkAllocatorArenas());
-        if (source != null) {
-          new NettyMemoryMetrics(
-              _allocator,
-              "shared-pool",
-              conf.networkAllocatorVerboseMetric(),
-              source,
-              Collections.emptyMap());
-        }
+  /**
+   * Returns the lazily created shared pooled ByteBuf allocator for the 
specified allowCache
+   * parameter value.
+   */
+  public static synchronized PooledByteBufAllocator 
getSharedPooledByteBufAllocator(
+      CelebornConf conf, AbstractSource source, boolean allowCache) {
+    final int index = allowCache ? 0 : 1;
+    if (_sharedPooledByteBufAllocator[index] == null) {
+      _sharedPooledByteBufAllocator[index] =
+          createPooledByteBufAllocator(true, allowCache, 
conf.networkAllocatorArenas());
+      if (source != null) {
+        new NettyMemoryMetrics(
+            _sharedPooledByteBufAllocator[index],
+            "shared-pool",
+            conf.networkAllocatorVerboseMetric(),
+            source,
+            Collections.emptyMap());
       }
-      return _allocator;
     }
+    return _sharedPooledByteBufAllocator[index];
   }
 
   public static PooledByteBufAllocator getPooledByteBufAllocator(
       TransportConf conf, AbstractSource source, boolean allowCache) {
     if (conf.getCelebornConf().networkShareMemoryAllocator()) {
-      return getSharedPooledByteBufAllocator(conf.getCelebornConf(), source);
+      return getSharedPooledByteBufAllocator(
+          conf.getCelebornConf(),
+          source,
+          allowCache && 
conf.getCelebornConf().networkMemoryAllocatorAllowCache());
     }
     PooledByteBufAllocator allocator =
         createPooledByteBufAllocator(conf.preferDirectBufs(), allowCache, 
conf.clientThreads());
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index fdc4988b1..c493366b2 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -460,6 +460,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
 
   def networkShareMemoryAllocator: Boolean = 
get(NETWORK_MEMORY_ALLOCATOR_SHARE)
 
+  def networkMemoryAllocatorAllowCache: Boolean =
+    get(NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE)
+
   def networkAllocatorArenas: Int = 
get(NETWORK_MEMORY_ALLOCATOR_ARENAS).getOrElse(Math.max(
     Runtime.getRuntime.availableProcessors(),
     2))
@@ -1234,6 +1237,15 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("10s")
 
+  val NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE: ConfigEntry[Boolean] =
+    buildConf("celeborn.network.memory.allocator.allowCache")
+      .categories("network")
+      .internal
+      .version("0.3.1")
+      .doc("When false, globally disable thread-local cache in the shared 
PooledByteBufAllocator.")
+      .booleanConf
+      .createWithDefault(true)
+
   val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] =
     buildConf("celeborn.network.memory.allocator.share")
       .categories("network")
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
index 050b28166..1d633d105 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
@@ -42,10 +42,12 @@ public class ChannelsLimiter extends ChannelDuplexHandler
   private final AtomicBoolean isPaused = new AtomicBoolean(false);
   private final AtomicInteger needTrimChannels = new AtomicInteger(0);
   private final long waitTrimInterval;
+  private final boolean allowCache;
 
   public ChannelsLimiter(String moduleName, CelebornConf conf) {
     this.moduleName = moduleName;
     this.waitTrimInterval = conf.workerDirectMemoryTrimChannelWaitInterval();
+    this.allowCache = conf.networkMemoryAllocatorAllowCache();
     MemoryManager memoryManager = MemoryManager.instance();
     memoryManager.registerMemoryListener(this);
   }
@@ -147,7 +149,9 @@ public class ChannelsLimiter extends ChannelDuplexHandler
 
   @Override
   public void onTrim() {
-    trimCache();
+    if (allowCache) {
+      trimCache();
+    }
   }
 
   static class TrimCache {}

Reply via email to