This is an automated email from the ASF dual-hosted git repository. Wei-hao-Li pushed a commit to branch lwh/frameSize in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d3be441772faad3f3d3ea2c1a638a44fa4d0e213 Author: Weihao Li <[email protected]> AuthorDate: Tue Apr 21 19:05:21 2026 +0800 fix Signed-off-by: Weihao Li <[email protected]> --- .../rpc/TCompressedElasticFramedTransport.java | 3 ++- .../apache/iotdb/rpc/TElasticFramedTransport.java | 23 ++++++++++++++++++---- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../scheduler/FragmentInstanceDispatcherImpl.java | 11 +++++++++++ 4 files changed, 33 insertions(+), 5 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java index 5b9c81ec58b..62abc28e470 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java @@ -44,7 +44,7 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); - checkFrameSize(size); + validateFrame(size); readBuffer.fill(underlying, size); RpcStat.readCompressedBytes.addAndGet(size); try { @@ -69,6 +69,7 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr writeCompressBuffer.resizeIfNecessary(maxCompressedLength); int compressedLength = compress(writeBuffer.getBuffer(), 0, length, writeCompressBuffer.getBuffer(), 0); + checkWriteFrameSize(compressedLength); RpcStat.writeCompressedBytes.addAndGet(compressedLength); TFramedTransport.encodeFrameSize(compressedLength, i32buf); underlying.write(i32buf, 0, 4); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java index 31e0f0b6960..f1d46dbb434 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java @@ -174,11 +174,11 @@ public class TElasticFramedTransport extends TTransport { protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); - checkFrameSize(size); + validateFrame(size); readBuffer.fill(underlying, size); } - protected void checkFrameSize(int size) throws TTransportException { + protected void validateFrame(int size) throws TTransportException { final int HTTP_GET_SIGNATURE = 0x47455420; // "GET " final int HTTP_POST_SIGNATURE = 0x504F5354; // "POST" final int TLS_MIN_VERSION = 0x160300; @@ -196,8 +196,6 @@ public class TElasticFramedTransport extends TTransport { error = FrameError.TLS_REQUEST; } else if (size < 0) { error = FrameError.NEGATIVE_FRAME_SIZE; - } else if (size > thriftMaxFrameSize) { - error = FrameError.FRAME_SIZE_EXCEEDED; } } @@ -241,9 +239,26 @@ public class TElasticFramedTransport extends TTransport { } } + protected void checkWriteFrameSize(int size) throws TTransportException { + if (size <= thriftMaxFrameSize) { + return; + } + SocketAddress remoteAddress = null; + if (underlying instanceof TSocket) { + remoteAddress = ((TSocket) underlying).getSocket().getRemoteSocketAddress(); + } + String remoteInfo = (remoteAddress == null) ? "" : " to " + remoteAddress; + String message = + String.format( + FrameError.FRAME_SIZE_EXCEEDED.messageFormat, size, thriftMaxFrameSize, remoteInfo); + close(); + throw new TTransportException(TTransportException.CORRUPTED_DATA, message); + } + @Override public void flush() throws TTransportException { int length = writeBuffer.getPos(); + checkWriteFrameSize(length); TFramedTransport.encodeFrameSize(length, i32buf); underlying.write(i32buf, 0, 4); underlying.write(writeBuffer.getBuffer(), 0, length); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index f9e750f4012..9fb0bb2f464 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -51,6 +51,7 @@ public enum TSStatusCode { INTERNAL_SERVER_ERROR(305), DISPATCH_ERROR(306), LICENSE_ERROR(307), + THRIFT_FRAME_OVERSIZE(308), // Client, REDIRECTION_RECOMMEND(400), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index c52f8f94eb2..dba36cc2993 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -59,6 +59,7 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.Preconditions; @@ -549,6 +550,16 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { TSStatusCode.EXECUTE_STATEMENT_ERROR, String.format("unknown read type [%s]", instance.getType()))); } + } catch (TException e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + if (rootCause instanceof TTransportException + && ((TTransportException) rootCause).getType() == TTransportException.CORRUPTED_DATA) { + queryContext.addFailedEndPoint(endPoint); + throw new FragmentInstanceDispatchException( + new TSStatus(TSStatusCode.THRIFT_FRAME_OVERSIZE.getStatusCode()) + .setMessage(rootCause.getMessage())); + } + throw e; } }
