This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 26385f31a RATIS-2096. Add a conf to enable/disable zero copy. (#1099)
26385f31a is described below

commit 26385f31a0bd5c70390dd99a9a2c6ad17ba76414
Author: Duong Nguyen <[email protected]>
AuthorDate: Fri May 24 12:06:10 2024 -0700

    RATIS-2096. Add a conf to enable/disable zero copy. (#1099)
---
 .../src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java    |  9 +++++++++
 .../apache/ratis/grpc/server/GrpcClientProtocolService.java    |  8 +++++++-
 .../apache/ratis/grpc/server/GrpcServerProtocolService.java    |  9 ++++++++-
 .../main/java/org/apache/ratis/grpc/server/GrpcService.java    | 10 ++++++----
 .../java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java    |  2 ++
 5 files changed, 32 insertions(+), 6 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index 8caacfeeb..c14d844ee 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -292,6 +292,15 @@ public interface GrpcConfigKeys {
       setTimeDuration(properties::setTimeDuration,
           LOG_MESSAGE_BATCH_DURATION_KEY, logMessageBatchDuration);
     }
+
+    String ZERO_COPY_ENABLED_KEY = PREFIX + ".zerocopy.enabled";
+    boolean ZERO_COPY_ENABLED_DEFAULT = false;
+    static boolean zeroCopyEnabled(RaftProperties properties) {
+      return getBoolean(properties::getBoolean, ZERO_COPY_ENABLED_KEY, 
ZERO_COPY_ENABLED_DEFAULT, getDefaultLog());
+    }
+    static void setZeroCopyEnabled(RaftProperties properties, boolean enabled) 
{
+      setBoolean(properties::setBoolean, ZERO_COPY_ENABLED_KEY, enabled);
+    }
   }
 
   String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index d1ef99c74..80a9a439b 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -152,13 +152,15 @@ class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase {
   private final ExecutorService executor;
 
   private final OrderedStreamObservers orderedStreamObservers = new 
OrderedStreamObservers();
+  private final boolean zeroCopyEnabled;
   private final ZeroCopyMessageMarshaller<RaftClientRequestProto> 
zeroCopyRequestMarshaller;
 
   GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier, 
RaftClientAsynchronousProtocol protocol,
-      ExecutorService executor, ZeroCopyMetrics zeroCopyMetrics) {
+      ExecutorService executor, boolean zeroCopyEnabled, ZeroCopyMetrics 
zeroCopyMetrics) {
     this.idSupplier = idSupplier;
     this.protocol = protocol;
     this.executor = executor;
+    this.zeroCopyEnabled = zeroCopyEnabled;
     this.zeroCopyRequestMarshaller = new 
ZeroCopyMessageMarshaller<>(RaftClientRequestProto.getDefaultInstance(),
         zeroCopyMetrics::onZeroCopyMessage, 
zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage);
     zeroCopyMetrics.addUnreleased("client_protocol", 
zeroCopyRequestMarshaller::getUnclosedCount);
@@ -170,6 +172,10 @@ class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase {
 
   ServerServiceDefinition bindServiceWithZeroCopy() {
     ServerServiceDefinition orig = super.bindService();
+    if (!zeroCopyEnabled) {
+      LOG.info("{}: Zero copy is disabled.", getId());
+      return orig;
+    }
     ServerServiceDefinition.Builder builder = 
ServerServiceDefinition.builder(orig.getServiceDescriptor().getName());
 
     addMethodWithCustomMarshaller(orig, builder, getOrderedMethod(), 
zeroCopyRequestMarshaller);
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 4257f0038..451d74c64 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -225,11 +225,14 @@ class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase {
 
   private final Supplier<RaftPeerId> idSupplier;
   private final RaftServer server;
+  private final boolean zeroCopyEnabled;
   private final ZeroCopyMessageMarshaller<AppendEntriesRequestProto> 
zeroCopyRequestMarshaller;
 
-  GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer 
server, ZeroCopyMetrics zeroCopyMetrics) {
+  GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer 
server, boolean zeroCopyEnabled,
+      ZeroCopyMetrics zeroCopyMetrics) {
     this.idSupplier = idSupplier;
     this.server = server;
+    this.zeroCopyEnabled = zeroCopyEnabled;
     this.zeroCopyRequestMarshaller = new 
ZeroCopyMessageMarshaller<>(AppendEntriesRequestProto.getDefaultInstance(),
         zeroCopyMetrics::onZeroCopyMessage, 
zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage);
     zeroCopyMetrics.addUnreleased("server_protocol", 
zeroCopyRequestMarshaller::getUnclosedCount);
@@ -241,6 +244,10 @@ class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase {
 
   ServerServiceDefinition bindServiceWithZeroCopy() {
     ServerServiceDefinition orig = super.bindService();
+    if (!zeroCopyEnabled) {
+      LOG.info("{}: Zero copy is disabled.", getId());
+      return orig;
+    }
     ServerServiceDefinition.Builder builder = 
ServerServiceDefinition.builder(orig.getServiceDescriptor().getName());
 
     // Add appendEntries with zero copy marshaller.
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index fa9358604..e3c0a5edd 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -177,7 +177,8 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
         
RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
         GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
         RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()),
-        GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()));
+        GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()),
+        GrpcConfigKeys.Server.zeroCopyEnabled(server.getProperties()));
   }
 
   @SuppressWarnings("checkstyle:ParameterNumber") // private constructor
@@ -187,7 +188,7 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
       String serverHost, int serverPort, GrpcTlsConfig serverTlsConfig,
       SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
       SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration,
-      boolean useSeparateHBChannel) {
+      boolean useSeparateHBChannel, boolean zeroCopyEnabled) {
     super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
         p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(),
             requestTimeoutDuration, serverTlsConfig, useSeparateHBChannel)));
@@ -203,7 +204,8 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
         GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties),
         getId() + "-request-");
     this.zeroCopyMetrics = new ZeroCopyMetrics();
-    this.clientProtocolService = new GrpcClientProtocolService(idSupplier, 
raftServer, executor, zeroCopyMetrics);
+    this.clientProtocolService = new GrpcClientProtocolService(idSupplier, 
raftServer, executor,
+        zeroCopyEnabled, zeroCopyMetrics);
 
     this.serverInterceptor = new MetricServerInterceptor(
         idSupplier,
@@ -216,7 +218,7 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
     final NettyServerBuilder serverBuilder =
         startBuildingNettyServer(serverHost, serverPort, serverTlsConfig, 
grpcMessageSizeMax, flowControlWindow);
     GrpcServerProtocolService serverProtocolService = new 
GrpcServerProtocolService(idSupplier, raftServer,
-        zeroCopyMetrics);
+        zeroCopyEnabled, zeroCopyMetrics);
     serverBuilder.addService(ServerInterceptors.intercept(
         serverProtocolService.bindServiceWithZeroCopy(), serverInterceptor));
     if (!separateAdminServer) {
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
index aeee7c050..bd3b13750 100644
--- 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++ 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -76,6 +76,8 @@ public class MiniRaftClusterWithGrpc extends 
MiniRaftCluster.RpcBase {
         GrpcConfigKeys.Client.setPort(properties, 
NetUtils.createSocketAddr(address).getPort()));
     Optional.ofNullable(getAddress(id, group, 
RaftPeer::getAdminAddress)).ifPresent(address ->
         GrpcConfigKeys.Admin.setPort(properties, 
NetUtils.createSocketAddr(address).getPort()));
+    // Always run grpc integration tests with zero-copy enabled because the 
path of nonzero-copy is not risky.
+    GrpcConfigKeys.Server.setZeroCopyEnabled(properties, true);
     return parameters;
   }
 

Reply via email to