This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new e3ec90f5c [#1802] improvement(netty): Allow sharing Netty's memory
allocators (#1803)
e3ec90f5c is described below
commit e3ec90f5ce45fbc5c1ef07ce1e8889ddb0c9a9a9
Author: RickyMa <[email protected]>
AuthorDate: Tue Jun 18 21:03:30 2024 +0800
[#1802] improvement(netty): Allow sharing Netty's memory allocators (#1803)
### What changes were proposed in this pull request?
Introducing shared ByteBuf allocators.
This feature can be enabled via the
`rss.client.netty.client.shared.allocator.enabled` configuration.
If enabled then only three ByteBuf allocators are created:
one PooledByteBufAllocator where caching is allowed, one
PooledByteBufAllocator where not and one UnpooledByteBufAllocator When
disabled, a new allocator is created for each transport client.
We set the default value to true to reduce memory usage, refer to
https://github.com/apache/spark/pull/23278.
### Why are the changes needed?
For: https://github.com/apache/incubator-uniffle/issues/1802.
### Does this PR introduce _any_ user-facing change?
The `rss.client.netty.client.shared.allocator.enabled` configuration is
introduced.
### How was this patch tested?
Existing UTs.
---
.../uniffle/common/config/RssClientConf.java | 10 +++
.../netty/client/TransportClientFactory.java | 30 ++++++---
.../uniffle/common/netty/client/TransportConf.java | 4 ++
.../uniffle/common/netty/protocol/Decoders.java | 2 +-
.../apache/uniffle/common/util/GrpcNettyUtils.java | 4 +-
.../org/apache/uniffle/common/util/NettyUtils.java | 71 ++++++++++++++++++----
docs/client_guide/client_guide.md | 23 +++----
.../uniffle/client/impl/grpc/GrpcClient.java | 4 +-
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 4 +-
.../server/buffer/AbstractShuffleBuffer.java | 2 +-
.../apache/uniffle/server/netty/StreamServer.java | 4 +-
11 files changed, 117 insertions(+), 41 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 21350c6fd..570ea50c0 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -170,6 +170,16 @@ public class RssClientConf {
.withDescription(
"If true, we will use PooledByteBufAllocator to allocate byte
buffers within Netty, otherwise we'll use UnpooledByteBufAllocator.");
+ public static final ConfigOption<Boolean>
NETTY_CLIENT_SHARED_ALLOCATOR_ENABLED =
+ ConfigOptions.key("rss.client.netty.client.shared.allocator.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "A flag indicating whether to share the ByteBuf allocators
between the different Netty channels when enabling Netty. "
+ + "If enabled then only three ByteBuf allocators are
created: "
+ + "one PooledByteBufAllocator where caching is allowed, one
PooledByteBufAllocator where not and one UnpooledByteBufAllocator. "
+ + "When disabled, a new allocator is created for each
transport client.");
+
public static final ConfigOption<Integer>
NETTY_CLIENT_NUM_CONNECTIONS_PER_PEER =
ConfigOptions.key("rss.client.netty.client.connections.per.peer")
.intType()
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
index 6a3997023..4108b486b 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.AbstractByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -73,7 +73,7 @@ public class TransportClientFactory implements Closeable {
private final Class<? extends Channel> socketChannelClass;
private EventLoopGroup workerGroup;
- private AbstractByteBufAllocator byteBufAllocator;
+ private ByteBufAllocator byteBufAllocator;
public TransportClientFactory(TransportContext context) {
this.context = Objects.requireNonNull(context);
@@ -85,11 +85,27 @@ public class TransportClientFactory implements Closeable {
IOMode ioMode = conf.ioMode();
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
this.workerGroup = NettyUtils.createEventLoop(ioMode,
conf.clientThreads(), "netty-rpc-client");
- this.byteBufAllocator =
- conf.isPooledAllocatorEnabled()
- ? NettyUtils.createPooledByteBufAllocator(
- conf.preferDirectBufs(), false, conf.clientThreads())
- :
NettyUtils.createUnpooledByteBufAllocator(conf.preferDirectBufs());
+ if (conf.isSharedAllocatorEnabled()) {
+ this.byteBufAllocator =
+ conf.isPooledAllocatorEnabled()
+ ? NettyUtils.getSharedPooledByteBufAllocator(
+ conf.preferDirectBufs(), false, conf.clientThreads())
+ :
NettyUtils.getSharedUnpooledByteBufAllocator(conf.preferDirectBufs());
+ } else {
+ this.byteBufAllocator =
+ conf.isPooledAllocatorEnabled()
+ ? NettyUtils.createPooledByteBufAllocator(
+ conf.preferDirectBufs(), false, conf.clientThreads())
+ :
NettyUtils.createUnpooledByteBufAllocator(conf.preferDirectBufs());
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "isPooledAllocatorEnabled={}, isSharedAllocatorEnabled={},
preferDirectBufs={}, byteBufAllocator={}",
+ conf.isPooledAllocatorEnabled(),
+ conf.isSharedAllocatorEnabled(),
+ conf.preferDirectBufs(),
+ byteBufAllocator);
+ }
}
public TransportClient createClient(String remoteHost, int remotePort, int
partitionId)
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
index 684645080..64a6e912d 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
@@ -57,6 +57,10 @@ public class TransportConf {
return rssConf.get(RssClientConf.NETTY_CLIENT_POOLED_ALLOCATOR_ENABLED);
}
+ public boolean isSharedAllocatorEnabled() {
+ return rssConf.get(RssClientConf.NETTY_CLIENT_SHARED_ALLOCATOR_ENABLED);
+ }
+
public int receiveBuf() {
return rssConf.get(RssClientConf.NETTY_CLIENT_RECEIVE_BUFFER);
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java
index b8c687ce7..fc7004880 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java
@@ -47,7 +47,7 @@ public class Decoders {
long crc = byteBuf.readLong();
long taskAttemptId = byteBuf.readLong();
int dataLength = byteBuf.readInt();
- ByteBuf data =
NettyUtils.getNettyBufferAllocator().directBuffer(dataLength);
+ ByteBuf data =
NettyUtils.getSharedUnpooledByteBufAllocator(true).directBuffer(dataLength);
data.writeBytes(byteBuf, dataLength);
int lengthOfShuffleServers = byteBuf.readInt();
List<ShuffleServerInfo> serverInfos = Lists.newArrayList();
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/GrpcNettyUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/GrpcNettyUtils.java
index 8408001b9..93a7949d7 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/GrpcNettyUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/GrpcNettyUtils.java
@@ -35,9 +35,7 @@ public class GrpcNettyUtils {
int maxOrder,
int smallCacheSize,
int normalCacheSize) {
- if (numCores == 0) {
- numCores = Runtime.getRuntime().availableProcessors();
- }
+ numCores = NettyUtils.defaultNumThreads(numCores);
if (pageSize == 0) {
pageSize = PooledByteBufAllocator.defaultPageSize();
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java
index 468d2a7a3..59668b3b1 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java
@@ -18,8 +18,8 @@
package org.apache.uniffle.common.util;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReferenceArray;
-import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
@@ -43,6 +43,13 @@ public class NettyUtils {
private static final long MAX_DIRECT_MEMORY_IN_BYTES =
PlatformDependent.maxDirectMemory();
+ /** Specifies an upper bound on the number of Netty threads that Uniffle
requires by default. */
+ private static int MAX_DEFAULT_NETTY_THREADS = 8;
+
+ private static final AtomicReferenceArray<PooledByteBufAllocator>
+ SHARED_POOLED_BYTE_BUF_ALLOCATOR = new AtomicReferenceArray<>(2);
+ private static volatile UnpooledByteBufAllocator
sharedUnpooledByteBufAllocator;
+
/** Creates a Netty EventLoopGroup based on the IOMode. */
public static EventLoopGroup createEventLoop(IOMode mode, int numThreads,
String threadPrefix) {
ThreadFactory threadFactory =
ThreadUtils.getNettyThreadFactory(threadPrefix);
@@ -69,11 +76,59 @@ public class NettyUtils {
}
}
- public static PooledByteBufAllocator createPooledByteBufAllocator(
+ /**
+ * Returns the default number of threads for both the Netty client and
server thread pools. If
+ * numUsableCores is 0, we will use Runtime get an approximate number of
available cores.
+ */
+ public static int defaultNumThreads(int numUsableCores) {
+ final int availableCores;
+ if (numUsableCores > 0) {
+ availableCores = numUsableCores;
+ } else {
+ availableCores = Runtime.getRuntime().availableProcessors();
+ }
+ return Math.min(availableCores, MAX_DEFAULT_NETTY_THREADS);
+ }
+
+ /**
+ * Returns the lazily created shared pooled ByteBuf allocator for the
specified allowCache
+ * parameter value.
+ */
+ public static PooledByteBufAllocator getSharedPooledByteBufAllocator(
boolean allowDirectBufs, boolean allowCache, int numCores) {
- if (numCores == 0) {
- numCores = Runtime.getRuntime().availableProcessors();
+ final int index = allowCache ? 0 : 1;
+ PooledByteBufAllocator allocator =
SHARED_POOLED_BYTE_BUF_ALLOCATOR.get(index);
+ if (allocator == null) {
+ synchronized (NettyUtils.class) {
+ allocator = SHARED_POOLED_BYTE_BUF_ALLOCATOR.get(index);
+ if (allocator == null) {
+ allocator = createPooledByteBufAllocator(allowDirectBufs,
allowCache, numCores);
+ SHARED_POOLED_BYTE_BUF_ALLOCATOR.set(index, allocator);
+ }
+ }
}
+ return allocator;
+ }
+
+ /**
+ * Returns the lazily created shared un-pooled ByteBuf allocator for the
specified allowCache
+ * parameter value.
+ */
+ public static synchronized UnpooledByteBufAllocator
getSharedUnpooledByteBufAllocator(
+ boolean allowDirectBufs) {
+ if (sharedUnpooledByteBufAllocator == null) {
+ synchronized (NettyUtils.class) {
+ if (sharedUnpooledByteBufAllocator == null) {
+ sharedUnpooledByteBufAllocator =
createUnpooledByteBufAllocator(allowDirectBufs);
+ }
+ }
+ }
+ return sharedUnpooledByteBufAllocator;
+ }
+
+ public static PooledByteBufAllocator createPooledByteBufAllocator(
+ boolean allowDirectBufs, boolean allowCache, int numCores) {
+ numCores = defaultNumThreads(numCores);
return new PooledByteBufAllocator(
allowDirectBufs && PlatformDependent.directBufferPreferred(),
Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
@@ -117,14 +172,6 @@ public class NettyUtils {
return String.format("[%s -> %s]", channel.localAddress(),
channel.remoteAddress());
}
- private static class AllocatorHolder {
- private static final AbstractByteBufAllocator INSTANCE =
createUnpooledByteBufAllocator(true);
- }
-
- public static AbstractByteBufAllocator getNettyBufferAllocator() {
- return AllocatorHolder.INSTANCE;
- }
-
public static UnpooledByteBufAllocator
createUnpooledByteBufAllocator(boolean preferDirect) {
return new UnpooledByteBufAllocator(preferDirect);
}
diff --git a/docs/client_guide/client_guide.md
b/docs/client_guide/client_guide.md
index 85bb326f3..b33aebc1b 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -100,14 +100,15 @@ spark.rss.data.replica.read 2
```
### Netty Setting
-| Property Name | Default |
Description
|
-|----------------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| <client_type>.rss.client.type | GRPC |
Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using
GRPC_NETTY to enable Netty on the client side for better stability and
performance.
|
-| <client_type>.rss.client.netty.io.mode | NIO |
Netty EventLoopGroup backend, available options: NIO, EPOLL.
|
-| <client_type>.rss.client.netty.client.connection.timeout.ms | 600000 |
Connection active timeout.
|
-| <client_type>.rss.client.netty.client.threads | 0 |
Number of threads used in the client thread pool. Default is 0, Netty will use
the number of (available logical cores * 2) as the number of threads.
|
-| <client_type>.rss.client.netty.client.prefer.direct.bufs | true |
If true, we will prefer allocating off-heap byte buffers within Netty.
|
-| <client_type>.rss.client.netty.client.pooled.allocator.enabled | true |
If true, we will use PooledByteBufAllocator to allocate byte buffers within
Netty, otherwise we'll use UnpooledByteBufAllocator.
|
-| <client_type>.rss.client.netty.client.connections.per.peer | 2 |
Suppose there are 100 executors,
spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer
will establish a total of (100 * 2) connections with multiple clients.
|
-| <client_type>.rss.client.netty.client.receive.buffer | 0 |
Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and
send buffer should be latency * network_bandwidth. Assuming latency = 1ms,
network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the
operating system automatically estimates the receive buffer size based on
default settings. |
-| <client_type>.rss.client.netty.client.send.buffer | 0 |
Send buffer size (SO_SNDBUF).
|
+| Property Name | Default |
Description
|
+|----------------------------------------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| <client_type>.rss.client.type | GRPC |
Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using
GRPC_NETTY to enable Netty on the client side for better stability and
performance.
|
+| <client_type>.rss.client.netty.io.mode | NIO |
Netty EventLoopGroup backend, available options: NIO, EPOLL.
|
+| <client_type>.rss.client.netty.client.connection.timeout.ms | 600000 |
Connection active timeout.
|
+| <client_type>.rss.client.netty.client.threads | 0 |
Number of threads used in the client thread pool. Default is 0, Netty will use
the number of (available logical cores * 2) as the number of threads.
|
+| <client_type>.rss.client.netty.client.prefer.direct.bufs | true |
If true, we will prefer allocating off-heap byte buffers within Netty.
|
+| <client_type>.rss.client.netty.client.pooled.allocator.enabled | true |
If true, we will use PooledByteBufAllocator to allocate byte buffers within
Netty, otherwise we'll use UnpooledByteBufAllocator.
|
+| <client_type>.rss.client.netty.client.shared.allocator.enabled | true | A
flag indicating whether to share the ByteBuf allocators between the different
Netty channels when enabling Netty. If enabled then only three ByteBuf
allocators are created: one PooledByteBufAllocator where caching is allowed,
one PooledByteBufAllocator where not and one UnpooledByteBufAllocator. When
disabled, a new allocator is created for each transport client. |
+| <client_type>.rss.client.netty.client.connections.per.peer | 2 |
Suppose there are 100 executors,
spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer
will establish a total of (100 * 2) connections with multiple clients.
|
+| <client_type>.rss.client.netty.client.receive.buffer | 0 |
Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and
send buffer should be latency * network_bandwidth. Assuming latency = 1ms,
network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the
operating system automatically estimates the receive buffer size based on
default settings. |
+| <client_type>.rss.client.netty.client.send.buffer | 0 |
Send buffer size (SO_SNDBUF).
|
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
index c3bb5f883..3539b9e90 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
-import io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator;
+import io.grpc.netty.shaded.io.netty.buffer.ByteBufAllocator;
import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +76,7 @@ public abstract class GrpcClient {
this.channel = channel;
}
- protected AbstractByteBufAllocator createByteBufAllocator(
+ protected ByteBufAllocator createByteBufAllocator(
int pageSize, int maxOrder, int smallCacheSize) {
return GrpcNettyUtils.createPooledByteBufAllocator(true, 0, 0, 0, 0);
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 1c46077ad..806fd3c21 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
-import io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator;
+import io.grpc.netty.shaded.io.netty.buffer.ByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,7 +108,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
}
@Override
- protected AbstractByteBufAllocator createByteBufAllocator(
+ protected ByteBufAllocator createByteBufAllocator(
int pageSize, int maxOrder, int smallCacheSize) {
LOG.info(
"ShuffleServerGrpcNettyClient is initialized - host:{}, gRPC port:{},
netty port:{}, maxRetryAttempts:{}, usePlaintext:{}, pageSize:{}, maxOrder:{},
smallCacheSize={}",
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index 15197bc86..fd5a62280 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -93,7 +93,7 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
if (!bufferSegments.isEmpty()) {
CompositeByteBuf byteBuf =
new CompositeByteBuf(
- NettyUtils.getNettyBufferAllocator(),
+ NettyUtils.getSharedUnpooledByteBufAllocator(true),
true,
Constants.COMPOSITE_BYTE_BUF_MAX_COMPONENTS);
// copy result data
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
index f8410fcc9..23ccaff38 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
@@ -155,9 +155,9 @@ public class StreamServer implements ServerInterface {
})
.option(ChannelOption.SO_BACKLOG, backlogSize)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
- .option(ChannelOption.ALLOCATOR, NettyUtils.getNettyBufferAllocator())
+ .option(ChannelOption.ALLOCATOR,
NettyUtils.getSharedUnpooledByteBufAllocator(true))
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
- .childOption(ChannelOption.ALLOCATOR,
NettyUtils.getNettyBufferAllocator())
+ .childOption(ChannelOption.ALLOCATOR,
NettyUtils.getSharedUnpooledByteBufAllocator(true))
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);