This is an automated email from the ASF dual-hosted git repository.

Wei-hao-Li pushed a commit to branch lwh/frameSize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d3be441772faad3f3d3ea2c1a638a44fa4d0e213
Author: Weihao Li <[email protected]>
AuthorDate: Tue Apr 21 19:05:21 2026 +0800

    fix
    
    Signed-off-by: Weihao Li <[email protected]>
---
 .../rpc/TCompressedElasticFramedTransport.java     |  3 ++-
 .../apache/iotdb/rpc/TElasticFramedTransport.java  | 23 ++++++++++++++++++----
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 11 +++++++++++
 4 files changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index 5b9c81ec58b..62abc28e470 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -44,7 +44,7 @@ public abstract class TCompressedElasticFramedTransport 
extends TElasticFramedTr
   protected void readFrame() throws TTransportException {
     underlying.readAll(i32buf, 0, 4);
     int size = TFramedTransport.decodeFrameSize(i32buf);
-    checkFrameSize(size);
+    validateFrame(size);
     readBuffer.fill(underlying, size);
     RpcStat.readCompressedBytes.addAndGet(size);
     try {
@@ -69,6 +69,7 @@ public abstract class TCompressedElasticFramedTransport 
extends TElasticFramedTr
       writeCompressBuffer.resizeIfNecessary(maxCompressedLength);
       int compressedLength =
           compress(writeBuffer.getBuffer(), 0, length, 
writeCompressBuffer.getBuffer(), 0);
+      checkWriteFrameSize(compressedLength);
       RpcStat.writeCompressedBytes.addAndGet(compressedLength);
       TFramedTransport.encodeFrameSize(compressedLength, i32buf);
       underlying.write(i32buf, 0, 4);
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 31e0f0b6960..f1d46dbb434 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -174,11 +174,11 @@ public class TElasticFramedTransport extends TTransport {
   protected void readFrame() throws TTransportException {
     underlying.readAll(i32buf, 0, 4);
     int size = TFramedTransport.decodeFrameSize(i32buf);
-    checkFrameSize(size);
+    validateFrame(size);
     readBuffer.fill(underlying, size);
   }
 
-  protected void checkFrameSize(int size) throws TTransportException {
+  protected void validateFrame(int size) throws TTransportException {
     final int HTTP_GET_SIGNATURE = 0x47455420; // "GET "
     final int HTTP_POST_SIGNATURE = 0x504F5354; // "POST"
     final int TLS_MIN_VERSION = 0x160300;
@@ -196,8 +196,6 @@ public class TElasticFramedTransport extends TTransport {
         error = FrameError.TLS_REQUEST;
       } else if (size < 0) {
         error = FrameError.NEGATIVE_FRAME_SIZE;
-      } else if (size > thriftMaxFrameSize) {
-        error = FrameError.FRAME_SIZE_EXCEEDED;
       }
     }
 
@@ -241,9 +239,26 @@ public class TElasticFramedTransport extends TTransport {
     }
   }
 
+  protected void checkWriteFrameSize(int size) throws TTransportException {
+    if (size <= thriftMaxFrameSize) {
+      return;
+    }
+    SocketAddress remoteAddress = null;
+    if (underlying instanceof TSocket) {
+      remoteAddress = ((TSocket) 
underlying).getSocket().getRemoteSocketAddress();
+    }
+    String remoteInfo = (remoteAddress == null) ? "" : " to " + remoteAddress;
+    String message =
+        String.format(
+            FrameError.FRAME_SIZE_EXCEEDED.messageFormat, size, 
thriftMaxFrameSize, remoteInfo);
+    close();
+    throw new TTransportException(TTransportException.CORRUPTED_DATA, message);
+  }
+
   @Override
   public void flush() throws TTransportException {
     int length = writeBuffer.getPos();
+    checkWriteFrameSize(length);
     TFramedTransport.encodeFrameSize(length, i32buf);
     underlying.write(i32buf, 0, 4);
     underlying.write(writeBuffer.getBuffer(), 0, length);
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index f9e750f4012..9fb0bb2f464 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -51,6 +51,7 @@ public enum TSStatusCode {
   INTERNAL_SERVER_ERROR(305),
   DISPATCH_ERROR(306),
   LICENSE_ERROR(307),
+  THRIFT_FRAME_OVERSIZE(308),
 
   // Client,
   REDIRECTION_RECOMMEND(400),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index c52f8f94eb2..dba36cc2993 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.Preconditions;
@@ -549,6 +550,16 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
                   TSStatusCode.EXECUTE_STATEMENT_ERROR,
                   String.format("unknown read type [%s]", 
instance.getType())));
       }
+    } catch (TException e) {
+      Throwable rootCause = ExceptionUtils.getRootCause(e);
+      if (rootCause instanceof TTransportException
+          && ((TTransportException) rootCause).getType() == 
TTransportException.CORRUPTED_DATA) {
+        queryContext.addFailedEndPoint(endPoint);
+        throw new FragmentInstanceDispatchException(
+            new TSStatus(TSStatusCode.THRIFT_FRAME_OVERSIZE.getStatusCode())
+                .setMessage(rootCause.getMessage()));
+      }
+      throw e;
     }
   }
 

Reply via email to