This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 400e9518d [CELEBORN-2280] Support
celeborn.network.memory.allocator.type to specify netty memory allocator
400e9518d is described below
commit 400e9518d7f19cacf8797971e169afa6bdea5be6
Author: SteNicholas <[email protected]>
AuthorDate: Sat Mar 14 17:27:00 2026 +0800
[CELEBORN-2280] Support celeborn.network.memory.allocator.type to specify
netty memory allocator
### What changes were proposed in this pull request?
Support `celeborn.network.memory.allocator.type` to specify netty memory
allocator including `AdaptiveByteBufAllocator `.
### Why are the changes needed?
Netty 4.2 introduces `AdaptiveByteBufAllocator` an auto-tuning pooling
`ByteBufAllocator` which uses `AdaptivePoolingAllocator` added in
https://github.com/netty/netty/pull/13075.
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
Introduce `celeborn.network.memory.allocator.type` to specify netty memory
allocator.
### How was this patch tested?
CI.
Closes #3625 from SteNicholas/CELEBORN-2280.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../network/util/NettyMemoryAllocatorType.java | 33 ++++++++++++
.../celeborn/common/network/util/NettyUtils.java | 61 ++++++++++++----------
.../org/apache/celeborn/common/CelebornConf.scala | 28 ++++++----
docs/migration.md | 2 +
4 files changed, 86 insertions(+), 38 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryAllocatorType.java
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryAllocatorType.java
new file mode 100644
index 000000000..1aea6efbc
--- /dev/null
+++
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryAllocatorType.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.util;
+
+import io.netty.buffer.AdaptiveByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+/** Netty memory allocator type of {@link ByteBufAllocator}. */
+public enum NettyMemoryAllocatorType {
+ /** A pooled {@link ByteBufAllocator}: {@link PooledByteBufAllocator}. */
+ POOLED,
+ /** A un-pooled {@link ByteBufAllocator}: {@link UnpooledByteBufAllocator}.
*/
+ UNPOOLED,
+ /** An auto-tuning pooling {@link ByteBufAllocator}: {@link
AdaptiveByteBufAllocator}. */
+ ADAPTIVE
+}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
index 109328233..df9d507a2 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
+import io.netty.buffer.AdaptiveByteBufAllocator;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
@@ -132,7 +133,7 @@ public class NettyUtils {
/**
* Create a ByteBufAllocator that respects the parameters
*
- * @param pooled If true, create a PooledByteBufAllocator, otherwise
UnpooledByteBufAllocator
+ * @param allocatorType netty memory allocator type.
* @param allowDirectBufs If true and platform supports, allocate ByteBuf in
direct memory,
* otherwise in heap memory.
* @param allowCache If true, enable thread-local cache, it only take effect
for
@@ -141,23 +142,32 @@ public class NettyUtils {
* effect for PooledByteBufAllocator.
*/
private static ByteBufAllocator createByteBufAllocator(
- boolean pooled, boolean allowDirectBufs, boolean allowCache, int
numCores) {
- if (pooled) {
- if (numCores == 0) {
- numCores = Runtime.getRuntime().availableProcessors();
- }
- return new PooledByteBufAllocator(
- allowDirectBufs && PlatformDependent.directBufferPreferred(),
- Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
- Math.min(PooledByteBufAllocator.defaultNumDirectArena(),
allowDirectBufs ? numCores : 0),
- PooledByteBufAllocator.defaultPageSize(),
- PooledByteBufAllocator.defaultMaxOrder(),
- allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
- allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
- allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads());
- } else {
- return new UnpooledByteBufAllocator(
- allowDirectBufs && PlatformDependent.directBufferPreferred());
+ NettyMemoryAllocatorType allocatorType,
+ boolean allowDirectBufs,
+ boolean allowCache,
+ int numCores) {
+ boolean preferDirect = allowDirectBufs &&
PlatformDependent.directBufferPreferred();
+ switch (allocatorType) {
+ case POOLED:
+ if (numCores == 0) {
+ numCores = Runtime.getRuntime().availableProcessors();
+ }
+ return new PooledByteBufAllocator(
+ preferDirect,
+ Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
+ Math.min(
+ PooledByteBufAllocator.defaultNumDirectArena(),
allowDirectBufs ? numCores : 0),
+ PooledByteBufAllocator.defaultPageSize(),
+ PooledByteBufAllocator.defaultMaxOrder(),
+ allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
+ allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
+ allowCache &&
PooledByteBufAllocator.defaultUseCacheForAllThreads());
+ case UNPOOLED:
+ return new UnpooledByteBufAllocator(preferDirect);
+ case ADAPTIVE:
+ return new AdaptiveByteBufAllocator(preferDirect);
+ default:
+ throw new IllegalArgumentException("Unknown allocator type: " +
allocatorType);
}
}
@@ -169,10 +179,10 @@ public class NettyUtils {
CelebornConf conf, AbstractSource source, boolean allowCache) {
final int index = allowCache ? 0 : 1;
if (_sharedByteBufAllocator[index] == null) {
+ NettyMemoryAllocatorType allocatorType =
conf.networkMemoryAllocatorType();
_sharedByteBufAllocator[index] =
- createByteBufAllocator(
- conf.networkMemoryAllocatorPooled(), true, allowCache,
conf.networkAllocatorArenas());
- if (conf.networkMemoryAllocatorPooled()) {
+ createByteBufAllocator(allocatorType, true, allowCache,
conf.networkAllocatorArenas());
+ if (allocatorType == NettyMemoryAllocatorType.POOLED) {
pooledByteBufAllocators.add((PooledByteBufAllocator)
_sharedByteBufAllocator[index]);
}
if (source != null) {
@@ -206,13 +216,10 @@ public class NettyUtils {
} else {
arenas = conf.getCelebornConf().networkAllocatorArenas();
}
+ NettyMemoryAllocatorType allocatorType =
conf.getCelebornConf().networkMemoryAllocatorType();
ByteBufAllocator allocator =
- createByteBufAllocator(
- conf.getCelebornConf().networkMemoryAllocatorPooled(),
- conf.preferDirectBufs(),
- allowCache,
- arenas);
- if (conf.getCelebornConf().networkMemoryAllocatorPooled()) {
+ createByteBufAllocator(allocatorType, conf.preferDirectBufs(),
allowCache, arenas);
+ if (allocatorType == NettyMemoryAllocatorType.POOLED) {
pooledByteBufAllocators.add((PooledByteBufAllocator) allocator);
}
if (source != null) {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 6ec87a7c4..b1a4a469e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -36,7 +36,7 @@ import
org.apache.celeborn.common.client.{ApplicationInfoProvider, DefaultApplic
import org.apache.celeborn.common.identity.{DefaultIdentityProvider,
HadoopBasedIdentityProvider, IdentityProvider}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.internal.config._
-import org.apache.celeborn.common.network.util.{ByteUnit, IOMode}
+import org.apache.celeborn.common.network.util.{ByteUnit, IOMode,
NettyMemoryAllocatorType}
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.StorageInfo.Type
import org.apache.celeborn.common.protocol.StorageInfo.Type.{HDD, SSD}
@@ -616,8 +616,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def networkMemoryAllocatorAllowCache: Boolean =
get(NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE)
- def networkMemoryAllocatorPooled: Boolean =
- get(NETWORK_MEMORY_ALLOCATOR_POOLED)
+ def networkMemoryAllocatorType: NettyMemoryAllocatorType =
+ NettyMemoryAllocatorType.valueOf(get(NETWORK_MEMORY_ALLOCATOR_TYPE))
def networkAllocatorArenas: Int =
get(NETWORK_MEMORY_ALLOCATOR_ARENAS).getOrElse(Math.max(
Runtime.getRuntime.availableProcessors(),
@@ -1949,16 +1949,22 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
- val NETWORK_MEMORY_ALLOCATOR_POOLED: ConfigEntry[Boolean] =
- buildConf("celeborn.network.memory.allocator.pooled")
+ val NETWORK_MEMORY_ALLOCATOR_TYPE: ConfigEntry[String] =
+ buildConf("celeborn.network.memory.allocator.type")
.categories("network")
.internal
- .version("0.6.0")
- .doc("If disabled, always use UnpooledByteBufAllocator for aggressive
memory reclamation, " +
- "this is helpful for cases that worker has high memory usage even
after triming. " +
- "Disabling would cause performace degression and higher CPU usage.")
- .booleanConf
- .createWithDefault(true)
+ .version("0.7.0")
+ .doc("Specifies netty memory allocator type including: " +
+ s"${NettyMemoryAllocatorType.POOLED.name}: use PooledByteBufAllocator,
which is the default and recommended for better performance. " +
+ s"${NettyMemoryAllocatorType.UNPOOLED.name}: use
UnpooledByteBufAllocator, which is more aggressive in memory reclamation and
may cause performance degradation and higher CPU usage. " +
+ s"${NettyMemoryAllocatorType.ADAPTIVE.name}: use
AdaptiveByteBufAllocator, which is recommended to roll out usage slowly, and to
carefully monitor application performance in the process.")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(Set(
+ NettyMemoryAllocatorType.POOLED.name,
+ NettyMemoryAllocatorType.UNPOOLED.name,
+ NettyMemoryAllocatorType.ADAPTIVE.name))
+ .createWithDefault(NettyMemoryAllocatorType.POOLED.name)
val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.share")
diff --git a/docs/migration.md b/docs/migration.md
index 7e0385f36..810f88b2b 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -33,6 +33,8 @@ license: |
- Since 0.7.0, Celeborn changed the default value of
`celeborn.worker.directMemoryRatioForReadBuffer` from `0.1` to `0.35`, which
means read buffer threshold of buffer dispatcher is max direct memory * 0.35 at
default.
+- Since 0.7.0, Celeborn removed `celeborn.network.memory.allocator.pooled`.
Please use `celeborn.network.memory.allocator.type` instead.
+
# Upgrading from 0.5 to 0.6
- Since 0.6.0, Celeborn deprecate
`celeborn.client.spark.fetch.throwsFetchFailure`. Please use
`celeborn.client.spark.stageRerun.enabled` instead.