This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 26385f31a RATIS-2096. Add a conf to enable/disable zero copy. (#1099)
26385f31a is described below
commit 26385f31a0bd5c70390dd99a9a2c6ad17ba76414
Author: Duong Nguyen <[email protected]>
AuthorDate: Fri May 24 12:06:10 2024 -0700
RATIS-2096. Add a conf to enable/disable zero copy. (#1099)
---
.../src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java | 9 +++++++++
.../apache/ratis/grpc/server/GrpcClientProtocolService.java | 8 +++++++-
.../apache/ratis/grpc/server/GrpcServerProtocolService.java | 9 ++++++++-
.../main/java/org/apache/ratis/grpc/server/GrpcService.java | 10 ++++++----
.../java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java | 2 ++
5 files changed, 32 insertions(+), 6 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index 8caacfeeb..c14d844ee 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -292,6 +292,15 @@ public interface GrpcConfigKeys {
setTimeDuration(properties::setTimeDuration,
LOG_MESSAGE_BATCH_DURATION_KEY, logMessageBatchDuration);
}
+
+ String ZERO_COPY_ENABLED_KEY = PREFIX + ".zerocopy.enabled";
+ boolean ZERO_COPY_ENABLED_DEFAULT = false;
+ static boolean zeroCopyEnabled(RaftProperties properties) {
+ return getBoolean(properties::getBoolean, ZERO_COPY_ENABLED_KEY,
ZERO_COPY_ENABLED_DEFAULT, getDefaultLog());
+ }
+ static void setZeroCopyEnabled(RaftProperties properties, boolean enabled)
{
+ setBoolean(properties::setBoolean, ZERO_COPY_ENABLED_KEY, enabled);
+ }
}
String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index d1ef99c74..80a9a439b 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -152,13 +152,15 @@ class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase {
private final ExecutorService executor;
private final OrderedStreamObservers orderedStreamObservers = new
OrderedStreamObservers();
+ private final boolean zeroCopyEnabled;
private final ZeroCopyMessageMarshaller<RaftClientRequestProto>
zeroCopyRequestMarshaller;
GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier,
RaftClientAsynchronousProtocol protocol,
- ExecutorService executor, ZeroCopyMetrics zeroCopyMetrics) {
+ ExecutorService executor, boolean zeroCopyEnabled, ZeroCopyMetrics
zeroCopyMetrics) {
this.idSupplier = idSupplier;
this.protocol = protocol;
this.executor = executor;
+ this.zeroCopyEnabled = zeroCopyEnabled;
this.zeroCopyRequestMarshaller = new
ZeroCopyMessageMarshaller<>(RaftClientRequestProto.getDefaultInstance(),
zeroCopyMetrics::onZeroCopyMessage,
zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage);
zeroCopyMetrics.addUnreleased("client_protocol",
zeroCopyRequestMarshaller::getUnclosedCount);
@@ -170,6 +172,10 @@ class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase {
ServerServiceDefinition bindServiceWithZeroCopy() {
ServerServiceDefinition orig = super.bindService();
+ if (!zeroCopyEnabled) {
+ LOG.info("{}: Zero copy is disabled.", getId());
+ return orig;
+ }
ServerServiceDefinition.Builder builder =
ServerServiceDefinition.builder(orig.getServiceDescriptor().getName());
addMethodWithCustomMarshaller(orig, builder, getOrderedMethod(),
zeroCopyRequestMarshaller);
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 4257f0038..451d74c64 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -225,11 +225,14 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
private final Supplier<RaftPeerId> idSupplier;
private final RaftServer server;
+ private final boolean zeroCopyEnabled;
private final ZeroCopyMessageMarshaller<AppendEntriesRequestProto>
zeroCopyRequestMarshaller;
- GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer
server, ZeroCopyMetrics zeroCopyMetrics) {
+ GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer
server, boolean zeroCopyEnabled,
+ ZeroCopyMetrics zeroCopyMetrics) {
this.idSupplier = idSupplier;
this.server = server;
+ this.zeroCopyEnabled = zeroCopyEnabled;
this.zeroCopyRequestMarshaller = new
ZeroCopyMessageMarshaller<>(AppendEntriesRequestProto.getDefaultInstance(),
zeroCopyMetrics::onZeroCopyMessage,
zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage);
zeroCopyMetrics.addUnreleased("server_protocol",
zeroCopyRequestMarshaller::getUnclosedCount);
@@ -241,6 +244,10 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
ServerServiceDefinition bindServiceWithZeroCopy() {
ServerServiceDefinition orig = super.bindService();
+ if (!zeroCopyEnabled) {
+ LOG.info("{}: Zero copy is disabled.", getId());
+ return orig;
+ }
ServerServiceDefinition.Builder builder =
ServerServiceDefinition.builder(orig.getServiceDescriptor().getName());
// Add appendEntries with zero copy marshaller.
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index fa9358604..e3c0a5edd 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -177,7 +177,8 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()),
- GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()));
+ GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()),
+ GrpcConfigKeys.Server.zeroCopyEnabled(server.getProperties()));
}
@SuppressWarnings("checkstyle:ParameterNumber") // private constructor
@@ -187,7 +188,7 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
String serverHost, int serverPort, GrpcTlsConfig serverTlsConfig,
SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration,
- boolean useSeparateHBChannel) {
+ boolean useSeparateHBChannel, boolean zeroCopyEnabled) {
super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(),
requestTimeoutDuration, serverTlsConfig, useSeparateHBChannel)));
@@ -203,7 +204,8 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties),
getId() + "-request-");
this.zeroCopyMetrics = new ZeroCopyMetrics();
- this.clientProtocolService = new GrpcClientProtocolService(idSupplier,
raftServer, executor, zeroCopyMetrics);
+ this.clientProtocolService = new GrpcClientProtocolService(idSupplier,
raftServer, executor,
+ zeroCopyEnabled, zeroCopyMetrics);
this.serverInterceptor = new MetricServerInterceptor(
idSupplier,
@@ -216,7 +218,7 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
final NettyServerBuilder serverBuilder =
startBuildingNettyServer(serverHost, serverPort, serverTlsConfig,
grpcMessageSizeMax, flowControlWindow);
GrpcServerProtocolService serverProtocolService = new
GrpcServerProtocolService(idSupplier, raftServer,
- zeroCopyMetrics);
+ zeroCopyEnabled, zeroCopyMetrics);
serverBuilder.addService(ServerInterceptors.intercept(
serverProtocolService.bindServiceWithZeroCopy(), serverInterceptor));
if (!separateAdminServer) {
diff --git
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
index aeee7c050..bd3b13750 100644
---
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -76,6 +76,8 @@ public class MiniRaftClusterWithGrpc extends
MiniRaftCluster.RpcBase {
GrpcConfigKeys.Client.setPort(properties,
NetUtils.createSocketAddr(address).getPort()));
Optional.ofNullable(getAddress(id, group,
RaftPeer::getAdminAddress)).ifPresent(address ->
GrpcConfigKeys.Admin.setPort(properties,
NetUtils.createSocketAddr(address).getPort()));
+ // Always run grpc integration tests with zero-copy enabled because the
path of nonzero-copy is not risky.
+ GrpcConfigKeys.Server.setZeroCopyEnabled(properties, true);
return parameters;
}