This is an automated email from the ASF dual-hosted git repository.

rainyu pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 48c5bb2361 Add maxMessageSize config  (#15781)
48c5bb2361 is described below

commit 48c5bb236191f09642c84d562552b66ea460ecec
Author: Rain Yu <[email protected]>
AuthorDate: Mon Nov 24 10:02:30 2025 +0800

    Add maxMessageSize config  (#15781)
    
    Add maxMessageSize config
    ---------
    
    Co-authored-by: earthchen <[email protected]>
---
 .../apache/dubbo/config/nested/TripleConfig.java    | 21 +++++++++++++++++++++
 .../http12/message/LengthFieldStreamingDecoder.java | 17 +++++++++++++++++
 .../main/java/org/apache/dubbo/rpc/Constants.java   |  3 +++
 .../dubbo/rpc/protocol/tri/compressor/Bzip2.java    | 17 +++++++++++++++++
 .../dubbo/rpc/protocol/tri/compressor/Gzip.java     | 18 ++++++++++++++++++
 .../dubbo/rpc/protocol/tri/frame/TriDecoder.java    | 14 ++++++++++++++
 6 files changed, 90 insertions(+)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java 
b/dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java
index 854cb2d051..3ad3a08c5b 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java
@@ -41,6 +41,9 @@ public class TripleConfig implements Serializable {
     public static final int DEFAULT_CONNECTION_INITIAL_WINDOW_SIZE_KEY = 
65_536;
     public static final int DEFAULT_MAX_FRAME_SIZE = 8_388_608;
     public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 32_768;
+    public static final int DEFAULT_MAX_MESSAGE_SIZE = 50 * 1024 * 1024;
+
+    public static final String H2_SETTINGS_MAX_MESSAGE_SIZE_KEY = 
"dubbo.protocol.triple.max-message-size";
 
     /**
      * Whether enable verbose mode.
@@ -143,6 +146,11 @@ public class TripleConfig implements Serializable {
      */
     private Integer maxHeaderListSize;
 
+    /**
+     * Maximum message size.
+     */
+    private Integer maxMessageSize;
+
     @Nested
     private RestConfig rest;
 
@@ -334,6 +342,19 @@ public class TripleConfig implements Serializable {
         this.maxHeaderListSize = maxHeaderListSize;
     }
 
+    public Integer getMaxMessageSize() {
+        return maxMessageSize;
+    }
+
+    @Parameter(excluded = true, key = H2_SETTINGS_MAX_MESSAGE_SIZE_KEY)
+    public int getMaxMessageSizeOrDefault() {
+        return maxMessageSize == null ? DEFAULT_MAX_MESSAGE_SIZE : 
maxMessageSize;
+    }
+
+    public void setMaxMessageSize(Integer maxMessageSize) {
+        this.maxMessageSize = maxMessageSize;
+    }
+
     public RestConfig getRest() {
         return rest;
     }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
index 77821ea431..b68a371411 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
@@ -16,8 +16,13 @@
  */
 package org.apache.dubbo.remoting.http12.message;
 
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.remoting.http12.CompositeInputStream;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
+import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -43,6 +48,8 @@ public class LengthFieldStreamingDecoder implements 
StreamingDecoder {
 
     private final int lengthFieldLength;
 
+    private final int maxMessageSize;
+
     private int requiredLength;
 
     public LengthFieldStreamingDecoder() {
@@ -57,6 +64,8 @@ public class LengthFieldStreamingDecoder implements 
StreamingDecoder {
         this.lengthFieldOffset = lengthFieldOffset;
         this.lengthFieldLength = lengthFieldLength;
         this.requiredLength = lengthFieldOffset + lengthFieldLength;
+        Configuration conf = 
ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
+        this.maxMessageSize = 
conf.getInt(Constants.H2_SETTINGS_MAX_MESSAGE_SIZE, 50 * 1024 * 1024);
     }
 
     @Override
@@ -150,6 +159,14 @@ public class LengthFieldStreamingDecoder implements 
StreamingDecoder {
         ignore = accumulate.read(lengthBytes);
         requiredLength = bytesToInt(lengthBytes);
 
+        // Validate bounds
+        if (requiredLength < 0) {
+            throw new RpcException("Invalid message length: " + 
requiredLength);
+        }
+        if (requiredLength > maxMessageSize) {
+            throw new RpcException(String.format("Message size %d exceeds 
limit %d", requiredLength, maxMessageSize));
+        }
+
         // Continue reading the frame body.
         state = DecodeState.PAYLOAD;
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
index bf64a3578f..7a81acd21f 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
@@ -109,6 +109,9 @@ public interface Constants {
     String H2_SETTINGS_BUILTIN_SERVICE_INIT = "dubbo.tri.builtin.service.init";
 
     String H2_SETTINGS_JSON_FRAMEWORK_NAME = 
"dubbo.protocol.triple.rest.json-framework";
+
+    String H2_SETTINGS_MAX_MESSAGE_SIZE = 
"dubbo.protocol.triple.max-message-size";
+
     String H2_SETTINGS_DISALLOWED_CONTENT_TYPES = 
"dubbo.protocol.triple.rest.disallowed-content-types";
     String H2_SETTINGS_OPENAPI_PREFIX = "dubbo.protocol.triple.rest.openapi";
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Bzip2.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Bzip2.java
index ab2c1bd3fe..c76ea1459a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Bzip2.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Bzip2.java
@@ -16,7 +16,11 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.compressor;
 
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.rpc.Constants;
 import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -35,11 +39,18 @@ public class Bzip2 implements Compressor, DeCompressor {
 
     public static final String BZIP2 = "bzip2";
 
+    private final int maxMessageSize;
+
     @Override
     public String getMessageEncoding() {
         return BZIP2;
     }
 
+    public Bzip2() {
+        Configuration conf = 
ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
+        this.maxMessageSize = 
conf.getInteger(Constants.H2_SETTINGS_MAX_MESSAGE_SIZE, 50 * 1024 * 1024);
+    }
+
     @Override
     public byte[] compress(byte[] payloadByteArr) throws RpcException {
         if (null == payloadByteArr || 0 == payloadByteArr.length) {
@@ -77,10 +88,16 @@ public class Bzip2 implements Compressor, DeCompressor {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         ByteArrayInputStream in = new ByteArrayInputStream(payloadByteArr);
         try {
+            int totalBytesRead = 0;
             BZip2CompressorInputStream unZip = new 
BZip2CompressorInputStream(in);
             byte[] buffer = new byte[2048];
             int n;
             while ((n = unZip.read(buffer)) >= 0) {
+                totalBytesRead += n;
+                if (totalBytesRead > maxMessageSize) {
+                    throw new RpcException("Decompressed message size " + 
totalBytesRead
+                            + " exceeds the maximum configured message size " 
+ maxMessageSize);
+                }
                 out.write(buffer, 0, n);
             }
         } catch (Exception e) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Gzip.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Gzip.java
index 2623ded6b1..1318b5d756 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Gzip.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Gzip.java
@@ -16,7 +16,11 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.compressor;
 
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.rpc.Constants;
 import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -32,6 +36,13 @@ public class Gzip implements Compressor, DeCompressor {
 
     public static final String GZIP = "gzip";
 
+    private final int maxMessageSize;
+
+    public Gzip() {
+        Configuration conf = 
ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
+        this.maxMessageSize = 
conf.getInteger(Constants.H2_SETTINGS_MAX_MESSAGE_SIZE, 50 * 1024 * 1024);
+    }
+
     @Override
     public String getMessageEncoding() {
         return GZIP;
@@ -70,10 +81,17 @@ public class Gzip implements Compressor, DeCompressor {
 
         ByteArrayInputStream byteInStream = new 
ByteArrayInputStream(payloadByteArr);
         ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+
         try (GZIPInputStream gzipInputStream = new 
GZIPInputStream(byteInStream)) {
             int readByteNum;
+            int totalBytesRead = 0;
             byte[] bufferArr = new byte[256];
             while ((readByteNum = gzipInputStream.read(bufferArr)) >= 0) {
+                totalBytesRead += readByteNum;
+                if (totalBytesRead > maxMessageSize) {
+                    throw new RpcException("Decompressed message size " + 
totalBytesRead
+                            + " exceeds the maximum configured message size " 
+ maxMessageSize);
+                }
                 byteOutStream.write(bufferArr, 0, readByteNum);
             }
         } catch (Exception exception) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoder.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoder.java
index 6970203cf8..b511fff284 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoder.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoder.java
@@ -16,7 +16,11 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.frame;
 
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.rpc.Constants;
 import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
 
 import io.netty.buffer.ByteBuf;
@@ -31,6 +35,7 @@ public class TriDecoder implements Deframer {
     private final CompositeByteBuf accumulate = Unpooled.compositeBuffer();
     private final Listener listener;
     private final DeCompressor decompressor;
+    private final Integer maxMessageSize;
     private boolean compressedFlag;
     private long pendingDeliveries;
     private boolean inDelivery = false;
@@ -42,6 +47,8 @@ public class TriDecoder implements Deframer {
     private GrpcDecodeState state = GrpcDecodeState.HEADER;
 
     public TriDecoder(DeCompressor decompressor, Listener listener) {
+        Configuration conf = 
ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
+        maxMessageSize = 
conf.getInteger(Constants.H2_SETTINGS_MAX_MESSAGE_SIZE, 50 * 1024 * 1024);
         this.decompressor = decompressor;
         this.listener = listener;
     }
@@ -123,6 +130,13 @@ public class TriDecoder implements Deframer {
 
         requiredLength = accumulate.readInt();
 
+        if (requiredLength < 0) {
+            throw new RpcException("Invalid message length: " + 
requiredLength);
+        }
+        if (requiredLength > maxMessageSize) {
+            throw new RpcException(String.format("Message size %d exceeds 
limit %d", requiredLength, maxMessageSize));
+        }
+
         // Continue reading the frame body.
         state = GrpcDecodeState.PAYLOAD;
     }

Reply via email to