This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 6f1bb4164 [CELEBORN-796] Support for globally disable thread-local
cache in the shared PooledByteBufAllocator
6f1bb4164 is described below
commit 6f1bb41646441698401172a5d6f55becf7ada032
Author: Fu Chen <[email protected]>
AuthorDate: Fri Aug 11 21:49:09 2023 +0800
[CELEBORN-796] Support for globally disable thread-local cache in the
shared PooledByteBufAllocator
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
As title
### Does this PR introduce _any_ user-facing change?
Yes, the thread local cache of shared `PooledByteBufAllocator` can be
disabled by setting `celeborn.network.memory.allocator.allowCache=false`
### How was this patch tested?
Pass GA
Closes #1716 from cfmcgrady/allow-cache.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../celeborn/common/network/util/NettyUtils.java | 41 +++++++++++++---------
.../org/apache/celeborn/common/CelebornConf.scala | 12 +++++++
.../deploy/worker/memory/ChannelsLimiter.java | 6 +++-
3 files changed, 41 insertions(+), 18 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
index 69951e187..4c941e864 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
@@ -42,7 +42,8 @@ import org.apache.celeborn.common.util.JavaUtils;
/** Utilities for creating various Netty constructs based on whether we're
using EPOLL or NIO. */
public class NettyUtils {
- private static volatile PooledByteBufAllocator _allocator;
+ private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator =
+ new PooledByteBufAllocator[2];
private static ConcurrentHashMap<String, Integer> allocatorsIndex =
JavaUtils.newConcurrentHashMap();
/** Creates a new ThreadFactory which prefixes each thread with the given
name. */
@@ -118,29 +119,35 @@ public class NettyUtils {
allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads());
}
- private static PooledByteBufAllocator getSharedPooledByteBufAllocator(
- CelebornConf conf, AbstractSource source) {
- synchronized (PooledByteBufAllocator.class) {
- if (_allocator == null) {
- // each core should have one arena to allocate memory
- _allocator = createPooledByteBufAllocator(true, true,
conf.networkAllocatorArenas());
- if (source != null) {
- new NettyMemoryMetrics(
- _allocator,
- "shared-pool",
- conf.networkAllocatorVerboseMetric(),
- source,
- Collections.emptyMap());
- }
+ /**
+ * Returns the lazily created shared pooled ByteBuf allocator for the
specified allowCache
+ * parameter value.
+ */
+ public static synchronized PooledByteBufAllocator
getSharedPooledByteBufAllocator(
+ CelebornConf conf, AbstractSource source, boolean allowCache) {
+ final int index = allowCache ? 0 : 1;
+ if (_sharedPooledByteBufAllocator[index] == null) {
+ _sharedPooledByteBufAllocator[index] =
+ createPooledByteBufAllocator(true, allowCache,
conf.networkAllocatorArenas());
+ if (source != null) {
+ new NettyMemoryMetrics(
+ _sharedPooledByteBufAllocator[index],
+ "shared-pool",
+ conf.networkAllocatorVerboseMetric(),
+ source,
+ Collections.emptyMap());
}
- return _allocator;
}
+ return _sharedPooledByteBufAllocator[index];
}
public static PooledByteBufAllocator getPooledByteBufAllocator(
TransportConf conf, AbstractSource source, boolean allowCache) {
if (conf.getCelebornConf().networkShareMemoryAllocator()) {
- return getSharedPooledByteBufAllocator(conf.getCelebornConf(), source);
+ return getSharedPooledByteBufAllocator(
+ conf.getCelebornConf(),
+ source,
+ allowCache &&
conf.getCelebornConf().networkMemoryAllocatorAllowCache());
}
PooledByteBufAllocator allocator =
createPooledByteBufAllocator(conf.preferDirectBufs(), allowCache,
conf.clientThreads());
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index fdc4988b1..c493366b2 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -460,6 +460,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def networkShareMemoryAllocator: Boolean =
get(NETWORK_MEMORY_ALLOCATOR_SHARE)
+ def networkMemoryAllocatorAllowCache: Boolean =
+ get(NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE)
+
def networkAllocatorArenas: Int =
get(NETWORK_MEMORY_ALLOCATOR_ARENAS).getOrElse(Math.max(
Runtime.getRuntime.availableProcessors(),
2))
@@ -1234,6 +1237,15 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")
+ val NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE: ConfigEntry[Boolean] =
+ buildConf("celeborn.network.memory.allocator.allowCache")
+ .categories("network")
+ .internal
+ .version("0.3.1")
+ .doc("When false, globally disable thread-local cache in the shared
PooledByteBufAllocator.")
+ .booleanConf
+ .createWithDefault(true)
+
val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.share")
.categories("network")
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
index 050b28166..1d633d105 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
@@ -42,10 +42,12 @@ public class ChannelsLimiter extends ChannelDuplexHandler
private final AtomicBoolean isPaused = new AtomicBoolean(false);
private final AtomicInteger needTrimChannels = new AtomicInteger(0);
private final long waitTrimInterval;
+ private final boolean allowCache;
public ChannelsLimiter(String moduleName, CelebornConf conf) {
this.moduleName = moduleName;
this.waitTrimInterval = conf.workerDirectMemoryTrimChannelWaitInterval();
+ this.allowCache = conf.networkMemoryAllocatorAllowCache();
MemoryManager memoryManager = MemoryManager.instance();
memoryManager.registerMemoryListener(this);
}
@@ -147,7 +149,9 @@ public class ChannelsLimiter extends ChannelDuplexHandler
@Override
public void onTrim() {
- trimCache();
+ if (allowCache) {
+ trimCache();
+ }
}
static class TrimCache {}