This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 563db23d5 RATIS-2404. Validate message size before sending async 
requests. (#1345)
563db23d5 is described below

commit 563db23d587e671c40f3e93f4267d483c63dd3ce
Author: slfan1989 <[email protected]>
AuthorDate: Wed Feb 11 08:10:35 2026 +0800

    RATIS-2404. Validate message size before sending async requests. (#1345)
---
 .../grpc/client/GrpcClientProtocolClient.java      | 18 +++++++----
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  | 35 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 5 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 159919fab..0eaec6b96 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -82,6 +82,7 @@ public class GrpcClientProtocolClient implements Closeable {
   private final ManagedChannel clientChannel;
   private final ManagedChannel adminChannel;
 
+  private final SizeInBytes maxMessageSize;
   private final TimeDuration requestTimeoutDuration;
   private final TimeDuration watchRequestTimeoutDuration;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
@@ -99,7 +100,7 @@ public class GrpcClientProtocolClient implements Closeable {
     this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
     this.target = target;
     final SizeInBytes flowControlWindow = 
GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
-    final SizeInBytes maxMessageSize = 
GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
+    this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, 
LOG::debug);
     metricClientInterceptor = new MetricClientInterceptor(getName());
 
     final String clientAddress = Optional.ofNullable(target.getClientAddress())
@@ -108,9 +109,9 @@ public class GrpcClientProtocolClient implements Closeable {
         .filter(x -> !x.isEmpty()).orElse(target.getAddress());
     final boolean separateAdminChannel = !Objects.equals(clientAddress, 
adminAddress);
 
-    clientChannel = buildChannel(clientAddress, clientSslContext, 
flowControlWindow, maxMessageSize);
+    clientChannel = buildChannel(clientAddress, clientSslContext, 
flowControlWindow);
     adminChannel = separateAdminChannel
-        ? buildChannel(adminAddress, adminSslContext, flowControlWindow, 
maxMessageSize)
+        ? buildChannel(adminAddress, adminSslContext, flowControlWindow)
         : clientChannel;
 
     asyncStub = RaftClientProtocolServiceGrpc.newStub(clientChannel);
@@ -121,7 +122,7 @@ public class GrpcClientProtocolClient implements Closeable {
   }
 
   private ManagedChannel buildChannel(String address, SslContext sslContext,
-      SizeInBytes flowControlWindow, SizeInBytes maxMessageSize) {
+      SizeInBytes flowControlWindow) {
     NettyChannelBuilder channelBuilder =
         NettyChannelBuilder.forTarget(address);
     // ignore any http proxy for grpc
@@ -332,13 +333,20 @@ public class GrpcClientProtocolClient implements 
Closeable {
     }
 
     CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
+      final RaftClientRequestProto proto = 
ClientProtoUtils.toRaftClientRequestProto(request);
+      if (proto.getSerializedSize() > maxMessageSize.getSizeInt()) {
+        return JavaUtils.completeExceptionally(new 
IllegalArgumentException(getName()
+            + ": request serialized size " + proto.getSerializedSize()
+            + " exceeds maximum " + maxMessageSize + " for " + request));
+      }
+
       final long callId = request.getCallId();
       final CompletableFuture<RaftClientReply> f = replies.putNew(callId);
       if (f == null) {
         return JavaUtils.completeExceptionally(new 
AlreadyClosedException(getName() + " is closed."));
       }
       try {
-        if 
(!requestStreamer.onNext(ClientProtoUtils.toRaftClientRequestProto(request))) {
+        if (!requestStreamer.onNext(proto)) {
           return JavaUtils.completeExceptionally(new 
AlreadyClosedException(getName() + ": the stream is closed."));
         }
       } catch(Exception t) {
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 100fb8d5d..b5247cf63 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -63,6 +63,8 @@ import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.slf4j.event.Level;
@@ -100,6 +102,39 @@ public class TestRaftServerWithGrpc extends BaseTest 
implements MiniRaftClusterW
     RaftClientConfigKeys.Rpc.setRequestTimeout(p, TimeDuration.valueOf(1, 
TimeUnit.SECONDS));
   }
 
+  @Test
+  @Timeout(value = 30, unit = TimeUnit.SECONDS)
+  public void testAsyncRequestExceedsMaxMessageSize() throws Exception {
+    final RaftProperties properties = getProperties();
+    final SizeInBytes originalMessageSize = 
GrpcConfigKeys.messageSizeMax(properties, s -> {});
+    final SizeInBytes originalBufferLimit = 
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
+    final boolean originalSendDummyRequest =
+        RaftClientConfigKeys.Async.Experimental.sendDummyRequest(properties);
+
+    RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties, 
SizeInBytes.valueOf("16KB"));
+    final SizeInBytes testMessageSizeMax = SizeInBytes.valueOf("1040KB");
+    GrpcConfigKeys.setMessageSizeMax(properties, testMessageSizeMax);
+    RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(properties, 
false);
+
+    try {
+      runWithNewCluster(1, cluster -> {
+        try (RaftClient client = 
cluster.createClient(RetryPolicies.noRetry())) {
+          final int oversizedKb = 1200;
+          final byte[] bytes = new byte[oversizedKb * 1024]; // > 1040KB
+          final SimpleMessage message = new SimpleMessage("oversized", 
ByteString.copyFrom(bytes));
+
+          testFailureCaseAsync("async oversized request",
+              () -> client.async().send(message),
+              IllegalArgumentException.class);
+        }
+      });
+    } finally {
+      GrpcConfigKeys.setMessageSizeMax(properties, originalMessageSize);
+      RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties, 
originalBufferLimit);
+      RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(properties, 
originalSendDummyRequest);
+    }
+  }
+
   @ParameterizedTest
   @MethodSource("data")
   public void testServerRestartOnException(Boolean separateHeartbeat) throws 
Exception {

Reply via email to