This is an automated email from the ASF dual-hosted git repository.
rainyu pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 48c5bb2361 Add maxMessageSize config (#15781)
48c5bb2361 is described below
commit 48c5bb236191f09642c84d562552b66ea460ecec
Author: Rain Yu <[email protected]>
AuthorDate: Mon Nov 24 10:02:30 2025 +0800
Add maxMessageSize config (#15781)
Add maxMessageSize config
---------
Co-authored-by: earthchen <[email protected]>
---
.../apache/dubbo/config/nested/TripleConfig.java | 21 +++++++++++++++++++++
.../http12/message/LengthFieldStreamingDecoder.java | 17 +++++++++++++++++
.../main/java/org/apache/dubbo/rpc/Constants.java | 3 +++
.../dubbo/rpc/protocol/tri/compressor/Bzip2.java | 17 +++++++++++++++++
.../dubbo/rpc/protocol/tri/compressor/Gzip.java | 18 ++++++++++++++++++
.../dubbo/rpc/protocol/tri/frame/TriDecoder.java | 14 ++++++++++++++
6 files changed, 90 insertions(+)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java
b/dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java
index 854cb2d051..3ad3a08c5b 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java
@@ -41,6 +41,9 @@ public class TripleConfig implements Serializable {
public static final int DEFAULT_CONNECTION_INITIAL_WINDOW_SIZE_KEY =
65_536;
public static final int DEFAULT_MAX_FRAME_SIZE = 8_388_608;
public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 32_768;
+ public static final int DEFAULT_MAX_MESSAGE_SIZE = 50 * 1024 * 1024;
+
+ public static final String H2_SETTINGS_MAX_MESSAGE_SIZE_KEY =
"dubbo.protocol.triple.max-message-size";
/**
* Whether enable verbose mode.
@@ -143,6 +146,11 @@ public class TripleConfig implements Serializable {
*/
private Integer maxHeaderListSize;
+ /**
+ * Maximum message size.
+ */
+ private Integer maxMessageSize;
+
@Nested
private RestConfig rest;
@@ -334,6 +342,19 @@ public class TripleConfig implements Serializable {
this.maxHeaderListSize = maxHeaderListSize;
}
+ public Integer getMaxMessageSize() {
+ return maxMessageSize;
+ }
+
+ @Parameter(excluded = true, key = H2_SETTINGS_MAX_MESSAGE_SIZE_KEY)
+ public int getMaxMessageSizeOrDefault() {
+ return maxMessageSize == null ? DEFAULT_MAX_MESSAGE_SIZE :
maxMessageSize;
+ }
+
+ public void setMaxMessageSize(Integer maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ }
+
public RestConfig getRest() {
return rest;
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
index 77821ea431..b68a371411 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
@@ -16,8 +16,13 @@
*/
package org.apache.dubbo.remoting.http12.message;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.remoting.http12.CompositeInputStream;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
+import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -43,6 +48,8 @@ public class LengthFieldStreamingDecoder implements
StreamingDecoder {
private final int lengthFieldLength;
+ private final int maxMessageSize;
+
private int requiredLength;
public LengthFieldStreamingDecoder() {
@@ -57,6 +64,8 @@ public class LengthFieldStreamingDecoder implements
StreamingDecoder {
this.lengthFieldOffset = lengthFieldOffset;
this.lengthFieldLength = lengthFieldLength;
this.requiredLength = lengthFieldOffset + lengthFieldLength;
+ Configuration conf =
ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
+ this.maxMessageSize =
conf.getInt(Constants.H2_SETTINGS_MAX_MESSAGE_SIZE, 50 * 1024 * 1024);
}
@Override
@@ -150,6 +159,14 @@ public class LengthFieldStreamingDecoder implements
StreamingDecoder {
ignore = accumulate.read(lengthBytes);
requiredLength = bytesToInt(lengthBytes);
+ // Validate bounds
+ if (requiredLength < 0) {
+ throw new RpcException("Invalid message length: " +
requiredLength);
+ }
+ if (requiredLength > maxMessageSize) {
+ throw new RpcException(String.format("Message size %d exceeds
limit %d", requiredLength, maxMessageSize));
+ }
+
// Continue reading the frame body.
state = DecodeState.PAYLOAD;
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
index bf64a3578f..7a81acd21f 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
@@ -109,6 +109,9 @@ public interface Constants {
String H2_SETTINGS_BUILTIN_SERVICE_INIT = "dubbo.tri.builtin.service.init";
String H2_SETTINGS_JSON_FRAMEWORK_NAME =
"dubbo.protocol.triple.rest.json-framework";
+
+ String H2_SETTINGS_MAX_MESSAGE_SIZE =
"dubbo.protocol.triple.max-message-size";
+
String H2_SETTINGS_DISALLOWED_CONTENT_TYPES =
"dubbo.protocol.triple.rest.disallowed-content-types";
String H2_SETTINGS_OPENAPI_PREFIX = "dubbo.protocol.triple.rest.openapi";
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Bzip2.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Bzip2.java
index ab2c1bd3fe..c76ea1459a 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Bzip2.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Bzip2.java
@@ -16,7 +16,11 @@
*/
package org.apache.dubbo.rpc.protocol.tri.compressor;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -35,11 +39,18 @@ public class Bzip2 implements Compressor, DeCompressor {
public static final String BZIP2 = "bzip2";
+ private final int maxMessageSize;
+
@Override
public String getMessageEncoding() {
return BZIP2;
}
+ public Bzip2() {
+ Configuration conf =
ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
+ this.maxMessageSize =
conf.getInteger(Constants.H2_SETTINGS_MAX_MESSAGE_SIZE, 50 * 1024 * 1024);
+ }
+
@Override
public byte[] compress(byte[] payloadByteArr) throws RpcException {
if (null == payloadByteArr || 0 == payloadByteArr.length) {
@@ -77,10 +88,16 @@ public class Bzip2 implements Compressor, DeCompressor {
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayInputStream in = new ByteArrayInputStream(payloadByteArr);
try {
+ int totalBytesRead = 0;
BZip2CompressorInputStream unZip = new
BZip2CompressorInputStream(in);
byte[] buffer = new byte[2048];
int n;
while ((n = unZip.read(buffer)) >= 0) {
+ totalBytesRead += n;
+ if (totalBytesRead > maxMessageSize) {
+ throw new RpcException("Decompressed message size " +
totalBytesRead
+ + " exceeds the maximum configured message size "
+ maxMessageSize);
+ }
out.write(buffer, 0, n);
}
} catch (Exception e) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Gzip.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Gzip.java
index 2623ded6b1..1318b5d756 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Gzip.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Gzip.java
@@ -16,7 +16,11 @@
*/
package org.apache.dubbo.rpc.protocol.tri.compressor;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -32,6 +36,13 @@ public class Gzip implements Compressor, DeCompressor {
public static final String GZIP = "gzip";
+ private final int maxMessageSize;
+
+ public Gzip() {
+ Configuration conf =
ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
+ this.maxMessageSize =
conf.getInteger(Constants.H2_SETTINGS_MAX_MESSAGE_SIZE, 50 * 1024 * 1024);
+ }
+
@Override
public String getMessageEncoding() {
return GZIP;
@@ -70,10 +81,17 @@ public class Gzip implements Compressor, DeCompressor {
ByteArrayInputStream byteInStream = new
ByteArrayInputStream(payloadByteArr);
ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+
try (GZIPInputStream gzipInputStream = new
GZIPInputStream(byteInStream)) {
int readByteNum;
+ int totalBytesRead = 0;
byte[] bufferArr = new byte[256];
while ((readByteNum = gzipInputStream.read(bufferArr)) >= 0) {
+ totalBytesRead += readByteNum;
+ if (totalBytesRead > maxMessageSize) {
+ throw new RpcException("Decompressed message size " +
totalBytesRead
+ + " exceeds the maximum configured message size "
+ maxMessageSize);
+ }
byteOutStream.write(bufferArr, 0, readByteNum);
}
} catch (Exception exception) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoder.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoder.java
index 6970203cf8..b511fff284 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoder.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoder.java
@@ -16,7 +16,11 @@
*/
package org.apache.dubbo.rpc.protocol.tri.frame;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
import io.netty.buffer.ByteBuf;
@@ -31,6 +35,7 @@ public class TriDecoder implements Deframer {
private final CompositeByteBuf accumulate = Unpooled.compositeBuffer();
private final Listener listener;
private final DeCompressor decompressor;
+ private final Integer maxMessageSize;
private boolean compressedFlag;
private long pendingDeliveries;
private boolean inDelivery = false;
@@ -42,6 +47,8 @@ public class TriDecoder implements Deframer {
private GrpcDecodeState state = GrpcDecodeState.HEADER;
public TriDecoder(DeCompressor decompressor, Listener listener) {
+ Configuration conf =
ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
+ maxMessageSize =
conf.getInteger(Constants.H2_SETTINGS_MAX_MESSAGE_SIZE, 50 * 1024 * 1024);
this.decompressor = decompressor;
this.listener = listener;
}
@@ -123,6 +130,13 @@ public class TriDecoder implements Deframer {
requiredLength = accumulate.readInt();
+ if (requiredLength < 0) {
+ throw new RpcException("Invalid message length: " +
requiredLength);
+ }
+ if (requiredLength > maxMessageSize) {
+ throw new RpcException(String.format("Message size %d exceeds
limit %d", requiredLength, maxMessageSize));
+ }
+
// Continue reading the frame body.
state = GrpcDecodeState.PAYLOAD;
}