This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3d0c91a74 [#1773] improvement(server): Reduce chunkSize of gRPC
Netty's PooledByteBufAllocator to reduce memory usage (#1780)
3d0c91a74 is described below
commit 3d0c91a74dad5cc5b5d436b3442d01cb7304986c
Author: RickyMa <[email protected]>
AuthorDate: Thu Jun 13 15:59:22 2024 +0800
[#1773] improvement(server): Reduce chunkSize of gRPC Netty's
PooledByteBufAllocator to reduce memory usage (#1780)
### What changes were proposed in this pull request?
Introduce configs to reduce the chunkSize of the gRPC internal Netty's
PooledByteBufAllocator to reduce memory usage on the server-side.
When enabling Netty, the changes for gRPC internal Netty's configs will be
as follows:
`io.netty.allocator.pageSize`: 8192 -> 4096 (bytes)
`io.netty.allocator.maxOrder`: 11 -> 3
`io.netty.allocator.smallCacheSize`: 128 -> 1024
Note: `chunkSize` = `pageSize` * 2^`maxOrder`,
Thus, the `io.netty.allocator.chunkSize`: 2MB -> 32KB
### Why are the changes needed?
For https://github.com/apache/incubator-uniffle/issues/1773.
### Does this PR introduce _any_ user-facing change?
No. This PR will not change anything when using GRPC. It will only take
effect when using GRPC_NETTY.
Adds configs: `rss.rpc.netty.pageSize`, `rss.rpc.netty.maxOrder`,
`rss.rpc.netty.smallCacheSize`
### How was this patch tested?
Existing tests.
---
.../apache/uniffle/common/config/RssBaseConf.java | 24 ++++++++++++++++++++++
.../org/apache/uniffle/common/rpc/GrpcServer.java | 19 +++++++++++++++--
docs/client_guide/client_guide.md | 4 ++++
docs/server_guide.md | 3 +++
4 files changed, 48 insertions(+), 2 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 436f803ed..039d3c831 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -52,6 +52,30 @@ public class RssBaseConf extends RssConf {
.defaultValue(true)
.withDescription("If enable metrics for rpc connection");
+ public static final ConfigOption<Integer> RPC_NETTY_PAGE_SIZE =
+ ConfigOptions.key("rss.rpc.netty.pageSize")
+ .intType()
+ .defaultValue(4096)
+ .withDescription(
+ "The value of pageSize for PooledByteBufAllocator when using
gRPC internal Netty on the server-side. "
+ + "This configuration will only take effect when
rss.rpc.server.type is set to GRPC_NETTY.");
+
+ public static final ConfigOption<Integer> RPC_NETTY_MAX_ORDER =
+ ConfigOptions.key("rss.rpc.netty.maxOrder")
+ .intType()
+ .defaultValue(3)
+ .withDescription(
+ "The value of maxOrder for PooledByteBufAllocator when using
gRPC internal Netty on the server-side. "
+ + "This configuration will only take effect when
rss.rpc.server.type is set to GRPC_NETTY.");
+
+ public static final ConfigOption<Integer> RPC_NETTY_SMALL_CACHE_SIZE =
+ ConfigOptions.key("rss.rpc.netty.smallCacheSize")
+ .intType()
+ .defaultValue(1024)
+ .withDescription(
+ "The value of smallCacheSize for PooledByteBufAllocator when
using gRPC internal Netty on the server-side. "
+ + "This configuration will only take effect when
rss.rpc.server.type is set to GRPC_NETTY.");
+
public static final ConfigOption<Integer> JETTY_HTTP_PORT =
ConfigOptions.key("rss.jetty.http.port")
.intType()
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index cd7f88952..cf1f05fea 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -35,6 +35,9 @@ import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
+import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator;
+import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +46,7 @@ import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.common.util.GrpcNettyUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -93,10 +97,21 @@ public class GrpcServer implements ServerInterface {
private Server buildGrpcServer(int serverPort) {
boolean isMetricsEnabled =
rssConf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
long maxInboundMessageSize =
rssConf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
+ ServerType serverType = rssConf.get(RssBaseConf.RPC_SERVER_TYPE);
+ int pageSize = rssConf.getInteger(RssBaseConf.RPC_NETTY_PAGE_SIZE);
+ int maxOrder = rssConf.getInteger(RssBaseConf.RPC_NETTY_MAX_ORDER);
+ int smallCacheSize =
rssConf.getInteger(RssBaseConf.RPC_NETTY_SMALL_CACHE_SIZE);
+ PooledByteBufAllocator pooledByteBufAllocator =
+ serverType == ServerType.GRPC
+ ? GrpcNettyUtils.createPooledByteBufAllocator(true, 0, 0, 0, 0)
+ : GrpcNettyUtils.createPooledByteBufAllocatorWithSmallCacheOnly(
+ true, 0, pageSize, maxOrder, smallCacheSize);
ServerBuilder<?> builder =
- ServerBuilder.forPort(serverPort)
+ NettyServerBuilder.forPort(serverPort)
.executor(pool)
- .maxInboundMessageSize((int) maxInboundMessageSize);
+ .maxInboundMessageSize((int) maxInboundMessageSize)
+ .withOption(ChannelOption.ALLOCATOR, pooledByteBufAllocator)
+ .withChildOption(ChannelOption.ALLOCATOR, pooledByteBufAllocator);
if (isMetricsEnabled) {
builder.addTransportFilter(new
MonitoringServerTransportFilter(grpcMetrics));
}
diff --git a/docs/client_guide/client_guide.md
b/docs/client_guide/client_guide.md
index 28e6ca88c..85bb326f3 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -56,6 +56,10 @@ The important configuration of client is listed as
following. These configuratio
| <client_type>.rss.client.max.concurrency.of.per-partition.write | -
| The maximum number of files that can be written
concurrently to a single partition is determined. This value will only be
respected by the remote shuffle server if it is greater than 0.
[...]
| <client_type>.rss.client.rpc.timeout.ms | 60000
| Timeout in milliseconds for RPC calls.
[...]
| <client_type>.rss.client.rpc.maxAttempts | 3
| When we fail to send RPC calls, we will retry for
maxAttempts times.
[...]
+| <client_type>.rss.client.rpc.netty.pageSize | 4096
| The value of pageSize for PooledByteBufAllocator
when using gRPC internal Netty on the client-side. This configuration will only
take effect when rss.rpc.server.type is set to GRPC_NETTY.
[...]
+| <client_type>.rss.client.rpc.netty.maxOrder | 3
| The value of maxOrder for PooledByteBufAllocator
when using gRPC internal Netty on the client-side. This configuration will only
take effect when rss.rpc.server.type is set to GRPC_NETTY.
[...]
+| <client_type>.rss.client.rpc.netty.smallCacheSize | 1024
| The value of smallCacheSize for
PooledByteBufAllocator when using gRPC internal Netty on the client-side. This
configuration will only take effect when rss.rpc.server.type is set to
GRPC_NETTY.
[...]
+
Notice:
1. `<client_type>` should be `mapreduce` `tez` or `spark`
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 4fecff93b..d2d5e968c 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -74,6 +74,9 @@ This document will introduce how to deploy Uniffle shuffle
servers.
| rss.coordinator.quorum | -
| Coordinator quorum
[...]
| rss.rpc.server.type | GRPC
| Shuffle server type,
supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using
GRPC_NETTY to enable Netty on the server side for better stability and
performance.
[...]
| rss.rpc.server.port | 19999
| RPC port for Shuffle
server, if set zero, grpc server start on random port.
[...]
+| rss.rpc.netty.pageSize | 4096
| The value of pageSize
for PooledByteBufAllocator when using gRPC internal Netty on the server-side.
This configuration will only take effect when rss.rpc.server.type is set to
GRPC_NETTY.
[...]
+| rss.rpc.netty.maxOrder | 3
| The value of maxOrder
for PooledByteBufAllocator when using gRPC internal Netty on the server-side.
This configuration will only take effect when rss.rpc.server.type is set to
GRPC_NETTY.
[...]
+| rss.rpc.netty.smallCacheSize | 1024
| The value of
smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the
server-side. This configuration will only take effect when rss.rpc.server.type
is set to GRPC_NETTY.
[...]
| rss.jetty.http.port | 19998
| Http port for Shuffle
server
[...]
| rss.server.netty.port | -1
| Netty port for Shuffle
server, if set zero, Netty server start on random port.
[...]
| rss.server.netty.epoll.enable | false
| Whether to enable epoll
model with Netty server.
[...]