This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new a318eb43a [CELEBORN-1815] Support UnpooledByteBufAllocator
a318eb43a is described below
commit a318eb43aba0f2a767f8eb5ca0c3c8c35bcd2da6
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Jan 2 20:54:34 2025 +0800
[CELEBORN-1815] Support UnpooledByteBufAllocator
### What changes were proposed in this pull request?
This PR introduces a configuration
`celeborn.network.memory.allocator.pooled` to allow users to disable
`PooledByteBufAllocator` globally and always use `UnpooledByteBufAllocator`.
### Why are the changes needed?
In some extreme cases, the Netty's `PooledByteBufAllocator` might have tons
of 4MiB chunks but only a few sizes of the capacity are used by the real
data(see https://github.com/apache/celeborn/pull/3018), for scenarios that
stability is important than performance, it's desirable to allow users to
disable the `PooledByteBufAllocator` globally.
### Does this PR introduce _any_ user-facing change?
Add a new feature, disabled by default.
### How was this patch tested?
Pass UT to ensure correctness. Performance and memory impact need to be
verified in the production scale cluster.
Closes #3043 from pan3793/CELEBORN-1815.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../flink/network/FlinkTransportClientFactory.java | 2 +-
.../network/client/TransportClientFactory.java | 13 ++-
.../common/network/server/TransportServer.java | 6 +-
.../common/network/util/NettyMemoryMetrics.java | 126 ++++++++++++---------
.../celeborn/common/network/util/NettyUtils.java | 80 +++++++------
.../org/apache/celeborn/common/CelebornConf.scala | 14 +++
.../deploy/worker/memory/MemoryManager.java | 4 +-
.../deploy/worker/memory/ReadBufferDispatcher.java | 11 +-
.../deploy/worker/storage/PartitionDataWriter.java | 8 +-
.../worker/storage/PartitionFilesSorter.java | 6 +-
.../service/deploy/worker/storage/Flusher.scala | 10 +-
.../deploy/worker/storage/StorageManager.scala | 6 +-
.../storage/PartitionDataWriterSuiteUtils.java | 3 +-
.../local/DiskMapPartitionDataWriterSuiteJ.java | 2 +-
.../local/DiskReducePartitionDataWriterSuiteJ.java | 4 +-
.../memory/MemoryPartitionFilesSorterSuiteJ.java | 6 +-
.../MemoryReducePartitionDataWriterSuiteJ.java | 8 +-
17 files changed, 176 insertions(+), 133 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
index 0bfaaf99e..244d8703e 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
@@ -45,7 +45,7 @@ public class FlinkTransportClientFactory extends
TransportClientFactory {
TransportContext context, List<TransportClientBootstrap> bootstraps, int
bufferSizeBytes) {
super(context, bootstraps);
bufferSuppliers = JavaUtils.newConcurrentHashMap();
- this.pooledAllocator = new UnpooledByteBufAllocator(true);
+ this.allocator = new UnpooledByteBufAllocator(true);
this.bufferSizeBytes = bufferSizeBytes;
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 647ef280b..fac64490b 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -93,7 +93,7 @@ public class TransportClientFactory implements Closeable {
private final int sendBuf;
private final Class<? extends Channel> socketChannelClass;
private EventLoopGroup workerGroup;
- protected ByteBufAllocator pooledAllocator;
+ protected ByteBufAllocator allocator;
private final int maxClientConnectRetries;
private final int maxClientConnectRetryWaitTimeMs;
@@ -115,9 +115,12 @@ public class TransportClientFactory implements Closeable {
logger.info("Module {} mode {} threads {}", conf.getModuleName(), ioMode,
conf.clientThreads());
this.workerGroup =
NettyUtils.createEventLoop(ioMode, conf.clientThreads(),
conf.getModuleName() + "-client");
- this.pooledAllocator =
- NettyUtils.getPooledByteBufAllocator(
- conf, context.getSource(), false, conf.clientThreads());
+ // Always disable thread-local cache when creating pooled ByteBuf
allocator for TransportClients
+ // because the ByteBufs are allocated by the event loop thread, but
released by the executor
+ // thread rather than the event loop thread. Those thread-local caches
actually delay the
+ // recycling of buffers, leading to larger memory usage.
+ this.allocator =
+ NettyUtils.getByteBufAllocator(conf, context.getSource(), false,
conf.clientThreads());
this.maxClientConnectRetries = conf.maxIORetries();
this.maxClientConnectRetryWaitTimeMs = conf.ioRetryWaitTimeMs();
}
@@ -268,7 +271,7 @@ public class TransportClientFactory implements Closeable {
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
- .option(ChannelOption.ALLOCATOR, pooledAllocator);
+ .option(ChannelOption.ALLOCATOR, allocator);
if (receiveBuf > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, receiveBuf);
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
index 672a450bc..1808a000c 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
@@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -91,8 +91,8 @@ public class TransportServer implements Closeable {
EventLoopGroup workerGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
conf.getModuleName() + "-server");
- PooledByteBufAllocator allocator =
- NettyUtils.getPooledByteBufAllocator(conf, source, true,
conf.serverThreads());
+ ByteBufAllocator allocator =
+ NettyUtils.getByteBufAllocator(conf, source, true,
conf.serverThreads());
bootstrap =
new ServerBootstrap()
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
index 440bcd709..59bba341b 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
@@ -23,19 +23,17 @@ import java.util.*;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
-import io.netty.buffer.PoolArenaMetric;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocatorMetric;
+import io.netty.buffer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.metrics.source.AbstractSource;
-/** A Netty memory metrics class to collect metrics from Netty
PooledByteBufAllocator. */
+/** A Netty memory metrics class to collect metrics from Netty
ByteBufAllocator. */
public class NettyMemoryMetrics {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final PooledByteBufAllocator pooledAllocator;
+ private final ByteBufAllocator allocator;
private final boolean verboseMetricsEnabled;
@@ -69,71 +67,87 @@ public class NettyMemoryMetrics {
}
public NettyMemoryMetrics(
- PooledByteBufAllocator pooledAllocator,
+ ByteBufAllocator allocator,
String metricPrefix,
boolean verboseMetricsEnabled,
AbstractSource source,
Map<String, String> labels) {
- this.pooledAllocator = pooledAllocator;
+ this.allocator = allocator;
this.metricPrefix = metricPrefix;
this.verboseMetricsEnabled = verboseMetricsEnabled;
this.source = source;
this.labels = labels;
- registerMetrics(this.pooledAllocator);
+ registerMetrics();
}
- private void registerMetrics(PooledByteBufAllocator allocator) {
- PooledByteBufAllocatorMetric pooledAllocatorMetric = allocator.metric();
-
+ private void registerMetrics() {
// Register general metrics.
if (source != null) {
- logger.debug("setup netty metrics");
- source.addGauge(
- MetricRegistry.name(metricPrefix, "usedHeapMemory"),
- labels,
- pooledAllocatorMetric::usedHeapMemory);
- source.addGauge(
- MetricRegistry.name(metricPrefix, "usedDirectMemory"),
- labels,
- pooledAllocatorMetric::usedDirectMemory);
- source.addGauge(
- MetricRegistry.name(metricPrefix, "numHeapArenas"),
- labels,
- pooledAllocatorMetric::numHeapArenas);
- source.addGauge(
- MetricRegistry.name(metricPrefix, "numDirectArenas"),
- labels,
- pooledAllocatorMetric::numDirectArenas);
- source.addGauge(
- MetricRegistry.name(metricPrefix, "tinyCacheSize"),
- labels,
- pooledAllocatorMetric::tinyCacheSize);
- source.addGauge(
- MetricRegistry.name(metricPrefix, "smallCacheSize"),
- labels,
- pooledAllocatorMetric::smallCacheSize);
- source.addGauge(
- MetricRegistry.name(metricPrefix, "normalCacheSize"),
- labels,
- pooledAllocatorMetric::normalCacheSize);
- source.addGauge(
- MetricRegistry.name(metricPrefix, "numThreadLocalCaches"),
- labels,
- pooledAllocatorMetric::numThreadLocalCaches);
- source.addGauge(
- MetricRegistry.name(metricPrefix, "chunkSize"), labels,
pooledAllocatorMetric::chunkSize);
- if (verboseMetricsEnabled) {
- int directArenaIndex = 0;
- for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) {
- registerArenaMetric(metric, "directArena" + directArenaIndex);
- directArenaIndex++;
- }
+ if (allocator instanceof UnpooledByteBufAllocator) {
+ logger.debug("Setup netty metrics for UnpooledByteBufAllocator");
+ ByteBufAllocatorMetric unpooledMetric = ((UnpooledByteBufAllocator)
allocator).metric();
+ source.addGauge(
+ MetricRegistry.name(metricPrefix, "usedHeapMemory"),
+ labels,
+ unpooledMetric::usedHeapMemory);
+ source.addGauge(
+ MetricRegistry.name(metricPrefix, "usedDirectMemory"),
+ labels,
+ unpooledMetric::usedDirectMemory);
+ } else if (allocator instanceof PooledByteBufAllocator) {
+ logger.debug("Setup netty metrics for PooledByteBufAllocator");
+ PooledByteBufAllocatorMetric pooledAllocatorMetric =
+ ((PooledByteBufAllocator) allocator).metric();
+ source.addGauge(
+ MetricRegistry.name(metricPrefix, "usedHeapMemory"),
+ labels,
+ pooledAllocatorMetric::usedHeapMemory);
+ source.addGauge(
+ MetricRegistry.name(metricPrefix, "usedDirectMemory"),
+ labels,
+ pooledAllocatorMetric::usedDirectMemory);
- int heapArenaIndex = 0;
- for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) {
- registerArenaMetric(metric, "heapArena" + heapArenaIndex);
- heapArenaIndex++;
+ source.addGauge(
+ MetricRegistry.name(metricPrefix, "numHeapArenas"),
+ labels,
+ pooledAllocatorMetric::numHeapArenas);
+ source.addGauge(
+ MetricRegistry.name(metricPrefix, "numDirectArenas"),
+ labels,
+ pooledAllocatorMetric::numDirectArenas);
+ source.addGauge(
+ MetricRegistry.name(metricPrefix, "tinyCacheSize"),
+ labels,
+ pooledAllocatorMetric::tinyCacheSize);
+ source.addGauge(
+ MetricRegistry.name(metricPrefix, "smallCacheSize"),
+ labels,
+ pooledAllocatorMetric::smallCacheSize);
+ source.addGauge(
+ MetricRegistry.name(metricPrefix, "normalCacheSize"),
+ labels,
+ pooledAllocatorMetric::normalCacheSize);
+ source.addGauge(
+ MetricRegistry.name(metricPrefix, "numThreadLocalCaches"),
+ labels,
+ pooledAllocatorMetric::numThreadLocalCaches);
+ source.addGauge(
+ MetricRegistry.name(metricPrefix, "chunkSize"),
+ labels,
+ pooledAllocatorMetric::chunkSize);
+ if (verboseMetricsEnabled) {
+ int directArenaIndex = 0;
+ for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) {
+ registerArenaMetric(metric, "directArena" + directArenaIndex);
+ directArenaIndex++;
+ }
+
+ int heapArenaIndex = 0;
+ for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) {
+ registerArenaMetric(metric, "heapArena" + heapArenaIndex);
+ heapArenaIndex++;
+ }
}
}
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
index 0a7641247..596b80785 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
@@ -23,7 +23,9 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
@@ -42,9 +44,8 @@ import org.apache.celeborn.common.util.JavaUtils;
/** Utilities for creating various Netty constructs based on whether we're
using EPOLL or NIO. */
public class NettyUtils {
- private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator =
- new PooledByteBufAllocator[2];
- private static ConcurrentHashMap<String, Integer> allocatorsIndex =
+ private static final ByteBufAllocator[] _sharedByteBufAllocator = new
ByteBufAllocator[2];
+ private static final ConcurrentHashMap<String, Integer> allocatorsIndex =
JavaUtils.newConcurrentHashMap();
/** Creates a new ThreadFactory which prefixes each thread with the given
name. */
public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
@@ -98,58 +99,69 @@ public class NettyUtils {
}
/**
- * Create a pooled ByteBuf allocator but disables the thread-local cache.
Thread-local caches are
- * disabled for TransportClients because the ByteBufs are allocated by the
event loop thread, but
- * released by the executor thread rather than the event loop thread. Those
thread-local caches
- * actually delay the recycling of buffers, leading to larger memory usage.
+ * Create a ByteBufAllocator that respects the parameters
+ *
+ * @param pooled If true, create a PooledByteBufAllocator, otherwise
UnpooledByteBufAllocator
+ * @param allowDirectBufs If true and platform supports, allocate ByteBuf in
direct memory,
+ * otherwise in heap memory.
+ * @param allowCache If true, enable thread-local cache, it only take effect
for
+ * PooledByteBufAllocator.
+ * @param numCores Number of heap/direct arenas, 0 means use number of cpu
cores, it only take
+ * effect for PooledByteBufAllocator.
*/
- private static PooledByteBufAllocator createPooledByteBufAllocator(
- boolean allowDirectBufs, boolean allowCache, int numCores) {
- if (numCores == 0) {
- numCores = Runtime.getRuntime().availableProcessors();
+ private static ByteBufAllocator createByteBufAllocator(
+ boolean pooled, boolean allowDirectBufs, boolean allowCache, int
numCores) {
+ if (pooled) {
+ if (numCores == 0) {
+ numCores = Runtime.getRuntime().availableProcessors();
+ }
+ return new PooledByteBufAllocator(
+ allowDirectBufs && PlatformDependent.directBufferPreferred(),
+ Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
+ Math.min(PooledByteBufAllocator.defaultNumDirectArena(),
allowDirectBufs ? numCores : 0),
+ PooledByteBufAllocator.defaultPageSize(),
+ PooledByteBufAllocator.defaultMaxOrder(),
+ allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
+ allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
+ allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads());
+ } else {
+ return new UnpooledByteBufAllocator(
+ allowDirectBufs && PlatformDependent.directBufferPreferred());
}
- return new PooledByteBufAllocator(
- allowDirectBufs && PlatformDependent.directBufferPreferred(),
- Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
- Math.min(PooledByteBufAllocator.defaultNumDirectArena(),
allowDirectBufs ? numCores : 0),
- PooledByteBufAllocator.defaultPageSize(),
- PooledByteBufAllocator.defaultMaxOrder(),
- allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
- allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
- allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads());
}
/**
* Returns the lazily created shared pooled ByteBuf allocator for the
specified allowCache
* parameter value.
*/
- public static synchronized PooledByteBufAllocator
getSharedPooledByteBufAllocator(
+ public static synchronized ByteBufAllocator getSharedByteBufAllocator(
CelebornConf conf, AbstractSource source, boolean allowCache) {
final int index = allowCache ? 0 : 1;
- if (_sharedPooledByteBufAllocator[index] == null) {
- _sharedPooledByteBufAllocator[index] =
- createPooledByteBufAllocator(true, allowCache,
conf.networkAllocatorArenas());
+ if (_sharedByteBufAllocator[index] == null) {
+ _sharedByteBufAllocator[index] =
+ createByteBufAllocator(
+ conf.networkMemoryAllocatorPooled(), true, allowCache,
conf.networkAllocatorArenas());
if (source != null) {
new NettyMemoryMetrics(
- _sharedPooledByteBufAllocator[index],
+ _sharedByteBufAllocator[index],
"shared-pool-" + index,
conf.networkAllocatorVerboseMetric(),
source,
Collections.emptyMap());
}
}
- return _sharedPooledByteBufAllocator[index];
+ return _sharedByteBufAllocator[index];
}
- public static PooledByteBufAllocator getPooledByteBufAllocator(
+ public static ByteBufAllocator getByteBufAllocator(
TransportConf conf, AbstractSource source, boolean allowCache) {
- return getPooledByteBufAllocator(conf, source, allowCache, 0);
+ return getByteBufAllocator(conf, source, allowCache, 0);
}
- public static PooledByteBufAllocator getPooledByteBufAllocator(
+ public static ByteBufAllocator getByteBufAllocator(
TransportConf conf, AbstractSource source, boolean allowCache, int
coreNum) {
if (conf.getCelebornConf().networkShareMemoryAllocator()) {
- return getSharedPooledByteBufAllocator(
+ return getSharedByteBufAllocator(
conf.getCelebornConf(),
source,
allowCache &&
conf.getCelebornConf().networkMemoryAllocatorAllowCache());
@@ -160,8 +172,12 @@ public class NettyUtils {
} else {
arenas = conf.getCelebornConf().networkAllocatorArenas();
}
- PooledByteBufAllocator allocator =
- createPooledByteBufAllocator(conf.preferDirectBufs(), allowCache,
arenas);
+ ByteBufAllocator allocator =
+ createByteBufAllocator(
+ conf.getCelebornConf().networkMemoryAllocatorPooled(),
+ conf.preferDirectBufs(),
+ allowCache,
+ arenas);
if (source != null) {
String poolName = "default-netty-pool";
Map<String, String> labels = new HashMap<>();
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a8cdf8b7d..7280a1cde 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -603,6 +603,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def networkMemoryAllocatorAllowCache: Boolean =
get(NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE)
+ def networkMemoryAllocatorPooled: Boolean =
+ get(NETWORK_MEMORY_ALLOCATOR_POOLED)
+
def networkAllocatorArenas: Int =
get(NETWORK_MEMORY_ALLOCATOR_ARENAS).getOrElse(Math.max(
Runtime.getRuntime.availableProcessors(),
2))
@@ -1782,6 +1785,17 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
+ val NETWORK_MEMORY_ALLOCATOR_POOLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.network.memory.allocator.pooled")
+ .categories("network")
+ .internal
+ .version("0.6.0")
+ .doc("If disabled, always use UnpooledByteBufAllocator for aggressive
memory reclamation, " +
+ "this is helpful for cases that worker has high memory usage even
after triming. " +
+ "Disabling would cause performace degression and higher CPU usage.")
+ .booleanConf
+ .createWithDefault(true)
+
val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.share")
.categories("network")
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index db368aa21..32704cf04 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.LongAdder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.util.internal.PlatformDependent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -546,7 +546,7 @@ public class MemoryManager {
readBufferDispatcher.close();
}
- public PooledByteBufAllocator getStoragePooledByteBufAllocator() {
+ public ByteBufAllocator getStorageByteBufAllocator() {
return storageManager.storageBufferAllocator();
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index 1646095ec..5c93435c9 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.LongAdder;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ public class ReadBufferDispatcher {
private final Logger logger =
LoggerFactory.getLogger(ReadBufferDispatcher.class);
private final LinkedBlockingQueue<ReadBufferRequest> requests = new
LinkedBlockingQueue<>();
private final MemoryManager memoryManager;
- private final PooledByteBufAllocator readBufferAllocator;
+ private final ByteBufAllocator readBufferAllocator;
private final LongAdder allocatedReadBuffers = new LongAdder();
private final long readBufferAllocationWait;
@VisibleForTesting public volatile boolean stopFlag = false;
@@ -51,7 +52,7 @@ public class ReadBufferDispatcher {
long checkThreadInterval = conf.readBufferDispatcherCheckThreadInterval();
// readBuffer is not a module name, it's a placeholder.
readBufferAllocator =
- NettyUtils.getPooledByteBufAllocator(new TransportConf("readBuffer",
conf), null, true);
+ NettyUtils.getByteBufAllocator(new TransportConf("readBuffer", conf),
null, true);
this.memoryManager = memoryManager;
dispatcherThread =
new AtomicReference<>(
@@ -118,8 +119,10 @@ public class ReadBufferDispatcher {
if (request != null) {
processBufferRequest(request, buffers);
} else {
- // Free buffer pool memory to main direct memory when dispatcher
is idle.
- readBufferAllocator.trimCurrentThreadCache();
+ if (readBufferAllocator instanceof PooledByteBufAllocator) {
+ // Free buffer pool memory to main direct memory when
dispatcher is idle.
+ ((PooledByteBufAllocator)
readBufferAllocator).trimCurrentThreadCache();
+ }
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 185eeb950..c711752fb 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -31,8 +31,8 @@ import scala.Tuple4;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.roaringbitmap.RoaringBitmap;
@@ -102,7 +102,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
protected final long memoryFileStorageMaxFileSize;
protected AtomicBoolean isMemoryShuffleFile = new AtomicBoolean();
protected final String filename;
- protected PooledByteBufAllocator pooledByteBufAllocator;
+ protected ByteBufAllocator allocator;
private final PartitionDataWriterContext writerContext;
private final long localFlusherBufferSize;
private final long hdfsFlusherBufferSize;
@@ -155,7 +155,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
// Reduce partition data writers support memory storage now
if (supportInMemory && createFileResult._1() != null) {
this.memoryFileInfo = createFileResult._1();
- this.pooledByteBufAllocator = storageManager.storageBufferAllocator();
+ this.allocator = storageManager.storageBufferAllocator();
this.isMemoryShuffleFile.set(true);
storageManager.registerMemoryPartitionWriter(this,
createFileResult._1());
} else if (createFileResult._2() != null) {
@@ -669,7 +669,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
flushBuffer = flusher.takeBuffer();
} else {
if (flushBuffer == null) {
- flushBuffer =
pooledByteBufAllocator.compositeBuffer(Integer.MAX_VALUE);
+ flushBuffer = allocator.compositeBuffer(Integer.MAX_VALUE);
}
}
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 2cc6c3292..dfc2f3e99 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -211,9 +211,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
startMapIndex, endMapIndex, shuffleChunkSize, indexesMap,
true),
shuffleChunkSize);
CompositeByteBuf targetBuffer =
- MemoryManager.instance()
- .getStoragePooledByteBufAllocator()
- .compositeBuffer(Integer.MAX_VALUE);
+
MemoryManager.instance().getStorageByteBufAllocator().compositeBuffer(Integer.MAX_VALUE);
ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
startMapIndex,
endMapIndex,
@@ -334,7 +332,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
// because this will affect origin buffer's reference count
CompositeByteBuf sortedBuffer =
MemoryManager.instance()
- .getStoragePooledByteBufAllocator()
+ .getStorageByteBufAllocator()
.compositeBuffer(Integer.MAX_VALUE - 1);
Map<Integer, List<ShuffleBlockInfo>> sortedBlocks = new TreeMap<>();
int sortedBufferIndex = 0;
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 205ce8bcc..a10059436 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicBoolean,
AtomicLongArray}
import scala.util.Random
-import io.netty.buffer.{CompositeByteBuf, PooledByteBufAllocator}
+import io.netty.buffer.{ByteBufAllocator, CompositeByteBuf,
PooledByteBufAllocator}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskStatus, TimeWindow}
@@ -38,7 +38,7 @@ import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager
abstract private[worker] class Flusher(
val workerSource: AbstractSource,
val threadCount: Int,
- val allocator: PooledByteBufAllocator,
+ val allocator: ByteBufAllocator,
val maxComponents: Int,
flushTimeMetric: TimeWindow,
mountPoint: String) extends Logging {
@@ -137,7 +137,7 @@ private[worker] class LocalFlusher(
workerSource: AbstractSource,
val deviceMonitor: DeviceMonitor,
threadCount: Int,
- allocator: PooledByteBufAllocator,
+ allocator: ByteBufAllocator,
maxComponents: Int,
val mountPoint: String,
val diskType: StorageInfo.Type,
@@ -176,7 +176,7 @@ private[worker] class LocalFlusher(
final private[worker] class HdfsFlusher(
workerSource: AbstractSource,
hdfsFlusherThreads: Int,
- allocator: PooledByteBufAllocator,
+ allocator: ByteBufAllocator,
maxComponents: Int) extends Flusher(
workerSource,
hdfsFlusherThreads,
@@ -195,7 +195,7 @@ final private[worker] class HdfsFlusher(
final private[worker] class S3Flusher(
workerSource: AbstractSource,
s3FlusherThreads: Int,
- allocator: PooledByteBufAllocator,
+ allocator: ByteBufAllocator,
maxComponents: Int) extends Flusher(
workerSource,
s3FlusherThreads,
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index c2f1c66a1..a1f551bec 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -29,7 +29,7 @@ import scala.collection.JavaConverters._
import scala.concurrent.duration._
import com.google.common.annotations.VisibleForTesting
-import io.netty.buffer.PooledByteBufAllocator
+import io.netty.buffer.{ByteBufAllocator, PooledByteBufAllocator}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
@@ -131,8 +131,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
private val deviceMonitor =
DeviceMonitor.createDeviceMonitor(conf, this, deviceInfos, tmpDiskInfos,
workerSource)
- val storageBufferAllocator: PooledByteBufAllocator =
- NettyUtils.getPooledByteBufAllocator(new TransportConf("StorageManager",
conf), null, true)
+ val storageBufferAllocator: ByteBufAllocator =
+ NettyUtils.getByteBufAllocator(new TransportConf("StorageManager", conf),
null, true)
// (mountPoint -> LocalFlusher)
private val (
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
index e24d98e3b..293be2126 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
@@ -124,8 +124,7 @@ public class PartitionDataWriterSuiteUtils {
source,
DeviceMonitor$.MODULE$.EmptyMonitor(),
1,
- NettyUtils.getPooledByteBufAllocator(
- new TransportConf("test", celebornConf), null, true),
+ NettyUtils.getByteBufAllocator(new TransportConf("test",
celebornConf), null, true),
256,
"disk1",
StorageInfo.Type.HDD,
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
index f3fb4030d..2561bd460 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
@@ -88,7 +88,7 @@ public class DiskMapPartitionDataWriterSuiteJ {
source,
DeviceMonitor$.MODULE$.EmptyMonitor(),
1,
- NettyUtils.getPooledByteBufAllocator(new TransportConf("test",
CONF), null, true),
+ NettyUtils.getByteBufAllocator(new TransportConf("test", CONF),
null, true),
256,
"disk1",
StorageInfo.Type.HDD,
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
index 748cd9d83..28abdf184 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
@@ -113,7 +113,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
DeviceMonitor$.MODULE$.EmptyMonitor(),
1,
- NettyUtils.getPooledByteBufAllocator(new TransportConf("test",
CONF), null, true),
+ NettyUtils.getByteBufAllocator(new TransportConf("test", CONF),
null, true),
256,
"disk1",
StorageInfo.Type.HDD,
@@ -424,7 +424,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
DeviceMonitor$.MODULE$.EmptyMonitor(),
1,
- NettyUtils.getPooledByteBufAllocator(new TransportConf("test",
CONF), null, true),
+ NettyUtils.getByteBufAllocator(new TransportConf("test", CONF),
null, true),
256,
"disk2",
StorageInfo.Type.HDD,
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java
index 1d39839b0..9de0287ea 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java
@@ -25,8 +25,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -60,8 +60,8 @@ public class MemoryPartitionFilesSorterSuiteJ {
fileInfo = new MemoryFileInfo(userIdentifier, true, new ReduceFileMeta(8 *
1024 * 1024));
AbstractSource source = Mockito.mock(AbstractSource.class);
- PooledByteBufAllocator allocator =
- NettyUtils.getSharedPooledByteBufAllocator(new CelebornConf(), source,
false);
+ ByteBufAllocator allocator =
+ NettyUtils.getSharedByteBufAllocator(new CelebornConf(), source,
false);
CompositeByteBuf buffer = allocator.compositeBuffer();
fileInfo.setBuffer(buffer);
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index 479d19676..677989c73 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -30,10 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
import scala.Function0;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
+import io.netty.buffer.*;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -105,8 +102,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
conf.set(CelebornConf.WORKER_DIRECT_MEMORY_CHECK_INTERVAL().key(), "10");
conf.set(CelebornConf.WORKER_DIRECT_MEMORY_REPORT_INTERVAL().key(), "10");
conf.set(CelebornConf.WORKER_READBUFFER_ALLOCATIONWAIT().key(), "10ms");
- PooledByteBufAllocator allocator =
- NettyUtils.getPooledByteBufAllocator(transConf, source, false);
+ ByteBufAllocator allocator = NettyUtils.getByteBufAllocator(transConf,
source, false);
storageManager = Mockito.mock(StorageManager.class);
AtomicLong evictCount = new AtomicLong();
Mockito.when(storageManager.evictedFileCount()).thenAnswer(a ->
evictCount);