This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 563db23d5 RATIS-2404. Validate message size before sending async
requests. (#1345)
563db23d5 is described below
commit 563db23d587e671c40f3e93f4267d483c63dd3ce
Author: slfan1989 <[email protected]>
AuthorDate: Wed Feb 11 08:10:35 2026 +0800
RATIS-2404. Validate message size before sending async requests. (#1345)
---
.../grpc/client/GrpcClientProtocolClient.java | 18 +++++++----
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 35 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 5 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 159919fab..0eaec6b96 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -82,6 +82,7 @@ public class GrpcClientProtocolClient implements Closeable {
private final ManagedChannel clientChannel;
private final ManagedChannel adminChannel;
+ private final SizeInBytes maxMessageSize;
private final TimeDuration requestTimeoutDuration;
private final TimeDuration watchRequestTimeoutDuration;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
@@ -99,7 +100,7 @@ public class GrpcClientProtocolClient implements Closeable {
this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
this.target = target;
final SizeInBytes flowControlWindow =
GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
- final SizeInBytes maxMessageSize =
GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
+ this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties,
LOG::debug);
metricClientInterceptor = new MetricClientInterceptor(getName());
final String clientAddress = Optional.ofNullable(target.getClientAddress())
@@ -108,9 +109,9 @@ public class GrpcClientProtocolClient implements Closeable {
.filter(x -> !x.isEmpty()).orElse(target.getAddress());
final boolean separateAdminChannel = !Objects.equals(clientAddress,
adminAddress);
- clientChannel = buildChannel(clientAddress, clientSslContext,
flowControlWindow, maxMessageSize);
+ clientChannel = buildChannel(clientAddress, clientSslContext,
flowControlWindow);
adminChannel = separateAdminChannel
- ? buildChannel(adminAddress, adminSslContext, flowControlWindow,
maxMessageSize)
+ ? buildChannel(adminAddress, adminSslContext, flowControlWindow)
: clientChannel;
asyncStub = RaftClientProtocolServiceGrpc.newStub(clientChannel);
@@ -121,7 +122,7 @@ public class GrpcClientProtocolClient implements Closeable {
}
private ManagedChannel buildChannel(String address, SslContext sslContext,
- SizeInBytes flowControlWindow, SizeInBytes maxMessageSize) {
+ SizeInBytes flowControlWindow) {
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forTarget(address);
// ignore any http proxy for grpc
@@ -332,13 +333,20 @@ public class GrpcClientProtocolClient implements
Closeable {
}
CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
+ final RaftClientRequestProto proto =
ClientProtoUtils.toRaftClientRequestProto(request);
+ if (proto.getSerializedSize() > maxMessageSize.getSizeInt()) {
+ return JavaUtils.completeExceptionally(new
IllegalArgumentException(getName()
+ + ": request serialized size " + proto.getSerializedSize()
+ + " exceeds maximum " + maxMessageSize + " for " + request));
+ }
+
final long callId = request.getCallId();
final CompletableFuture<RaftClientReply> f = replies.putNew(callId);
if (f == null) {
return JavaUtils.completeExceptionally(new
AlreadyClosedException(getName() + " is closed."));
}
try {
- if
(!requestStreamer.onNext(ClientProtoUtils.toRaftClientRequestProto(request))) {
+ if (!requestStreamer.onNext(proto)) {
return JavaUtils.completeExceptionally(new
AlreadyClosedException(getName() + ": the stream is closed."));
}
} catch(Exception t) {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 100fb8d5d..b5247cf63 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -63,6 +63,8 @@ import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;
@@ -100,6 +102,39 @@ public class TestRaftServerWithGrpc extends BaseTest
implements MiniRaftClusterW
RaftClientConfigKeys.Rpc.setRequestTimeout(p, TimeDuration.valueOf(1,
TimeUnit.SECONDS));
}
+ @Test
+ @Timeout(value = 30, unit = TimeUnit.SECONDS)
+ public void testAsyncRequestExceedsMaxMessageSize() throws Exception {
+ final RaftProperties properties = getProperties();
+ final SizeInBytes originalMessageSize =
GrpcConfigKeys.messageSizeMax(properties, s -> {});
+ final SizeInBytes originalBufferLimit =
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
+ final boolean originalSendDummyRequest =
+ RaftClientConfigKeys.Async.Experimental.sendDummyRequest(properties);
+
+ RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
SizeInBytes.valueOf("16KB"));
+ final SizeInBytes testMessageSizeMax = SizeInBytes.valueOf("1040KB");
+ GrpcConfigKeys.setMessageSizeMax(properties, testMessageSizeMax);
+ RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(properties,
false);
+
+ try {
+ runWithNewCluster(1, cluster -> {
+ try (RaftClient client =
cluster.createClient(RetryPolicies.noRetry())) {
+ final int oversizedKb = 1200;
+ final byte[] bytes = new byte[oversizedKb * 1024]; // > 1040KB
+ final SimpleMessage message = new SimpleMessage("oversized",
ByteString.copyFrom(bytes));
+
+ testFailureCaseAsync("async oversized request",
+ () -> client.async().send(message),
+ IllegalArgumentException.class);
+ }
+ });
+ } finally {
+ GrpcConfigKeys.setMessageSizeMax(properties, originalMessageSize);
+ RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
originalBufferLimit);
+ RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(properties,
originalSendDummyRequest);
+ }
+ }
+
@ParameterizedTest
@MethodSource("data")
public void testServerRestartOnException(Boolean separateHeartbeat) throws
Exception {