This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ce054e88e2f473d21c40616076cec18621cdc90b Author: Haonan <[email protected]> AuthorDate: Mon Nov 10 18:15:48 2025 +0800 Optimize the configuration logic of dn_thrift_max_frame_size (#16724) * Add thrift max frame size calculate logic * Add thrift max frame size calculate logic * fix review (cherry picked from commit ffa9c562c4eef4ee7f695fb4ef3311c50634a181) --- .../rpc/TCompressedElasticFramedTransport.java | 8 +- .../apache/iotdb/rpc/TElasticFramedTransport.java | 99 +++++++++++++--------- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 - .../conf/iotdb-system.properties.template | 4 +- .../apache/iotdb/commons/conf/IoTDBConstant.java | 1 - 6 files changed, 69 insertions(+), 57 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java index a3b4f38064a..5b9c81ec58b 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java @@ -44,13 +44,7 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); - - if (size < 0) { - close(); - throw new TTransportException( - TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!"); - } - + checkFrameSize(size); readBuffer.fill(underlying, size); RpcStat.readCompressedBytes.addAndGet(size); try { diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java index 6008988a809..cd5eea30993 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java @@ -169,54 +169,71 @@ public class TElasticFramedTransport extends TTransport { protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); + checkFrameSize(size); + readBuffer.fill(underlying, size); + } - if (size < 0) { - close(); - throw new TTransportException( - TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!"); + protected void checkFrameSize(int size) throws TTransportException { + final int HTTP_GET_SIGNATURE = 0x47455420; // "GET " + final int HTTP_POST_SIGNATURE = 0x504F5354; // "POST" + final int TLS_MIN_VERSION = 0x160300; + final int TLS_MAX_VERSION = 0x160303; + final int TLS_LENGTH_HIGH_MAX = 0x02; + + FrameError error = null; + if (size == HTTP_GET_SIGNATURE || size == HTTP_POST_SIGNATURE) { + error = FrameError.HTTP_REQUEST; + } else { + int high24 = size >>> 8; + if (high24 >= TLS_MIN_VERSION + && high24 <= TLS_MAX_VERSION + && (i32buf[3] & 0xFF) <= TLS_LENGTH_HIGH_MAX) { + error = FrameError.TLS_REQUEST; + } else if (size < 0) { + error = FrameError.NEGATIVE_FRAME_SIZE; + } else if (size > thriftMaxFrameSize) { + error = FrameError.FRAME_SIZE_EXCEEDED; + } } - if (size > thriftMaxFrameSize) { - SocketAddress remoteAddress = null; - if (underlying instanceof TSocket) { - remoteAddress = ((TSocket) underlying).getSocket().getRemoteSocketAddress(); - } - close(); - if (size == 1195725856L || size == 1347375956L) { - // if someone sends HTTP GET/POST to this port, the size will be read as the following - throw new TTransportException( - TTransportException.CORRUPTED_DATA, - String.format( - "Singular frame size (%d) detected, you may be sending HTTP GET/POST" - + "%s requests to the Thrift-RPC port, " - + "please confirm that you are using the right port", - size, remoteAddress == null ? "" : " from " + remoteAddress)); - } else { - throw new TTransportException( - TTransportException.CORRUPTED_DATA, - "Frame size (" + size + ") larger than protect max size (" + thriftMaxFrameSize + ")!"); - } + if (error == null) { + return; } - int high24 = size >>> 8; - if (high24 >= 0x160300 && high24 <= 0x160303 && (i32buf[3] & 0xFF) <= 0x02) { - // The typical TLS ClientHello requests start with 0x160300 ~ 0x160303 - // The 4th byte is typically in [0x00, 0x01, 0x02]. - SocketAddress remoteAddress = null; - if (underlying instanceof TSocket) { - remoteAddress = ((TSocket) underlying).getSocket().getRemoteSocketAddress(); - } - close(); - throw new TTransportException( - TTransportException.CORRUPTED_DATA, - String.format( - "Singular frame size (%d) detected, you may be sending TLS ClientHello requests" - + "%s to the Non-SSL Thrift-RPC" - + " port, please confirm that you are using the right configuration", - size, remoteAddress == null ? "" : " from " + remoteAddress)); + SocketAddress remoteAddress = null; + if (underlying instanceof TSocket) { + remoteAddress = ((TSocket) underlying).getSocket().getRemoteSocketAddress(); } + String remoteInfo = (remoteAddress == null) ? "" : " from " + remoteAddress; + close(); - readBuffer.fill(underlying, size); + error.throwException(size, remoteInfo, thriftMaxFrameSize); + } + + private enum FrameError { + HTTP_REQUEST( + "Singular frame size (%d) detected, you may be sending HTTP GET/POST%s " + + "requests to the Thrift-RPC port, please confirm that you are using the right port"), + TLS_REQUEST( + "Singular frame size (%d) detected, you may be sending TLS ClientHello " + + "requests%s to the Non-SSL Thrift-RPC port, please confirm that you are using " + + "the right configuration"), + NEGATIVE_FRAME_SIZE("Read a negative frame size (%d)%s!"), + FRAME_SIZE_EXCEEDED("Frame size (%d) larger than protect max size (%d)%s!"); + + private final String messageFormat; + + FrameError(String messageFormat) { + this.messageFormat = messageFormat; + } + + void throwException(int size, String remoteInfo, int maxSize) throws TTransportException { + String message = + (this == FRAME_SIZE_EXCEEDED) + ? String.format(messageFormat, size, maxSize, remoteInfo) + : String.format(messageFormat, size, remoteInfo); + throw new TTransportException(TTransportException.CORRUPTED_DATA, message); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index bb62d241cc9..36a148430cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -807,7 +807,7 @@ public class IoTDBConfig { private float udfCollectorMemoryBudgetInMB = (float) (1.0 / 3 * udfMemoryBudgetInMB); /** Unit: byte */ - private int thriftMaxFrameSize = 536870912; + private int thriftMaxFrameSize = getDefaultThriftMaxFrameSize(); private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY; @@ -2572,10 +2572,16 @@ public class IoTDBConfig { } public void setThriftMaxFrameSize(int thriftMaxFrameSize) { - this.thriftMaxFrameSize = thriftMaxFrameSize; + this.thriftMaxFrameSize = + thriftMaxFrameSize <= 0 ? getDefaultThriftMaxFrameSize() : thriftMaxFrameSize; BaseRpcTransportFactory.setThriftMaxFrameSize(this.thriftMaxFrameSize); } + private static int getDefaultThriftMaxFrameSize() { + return Math.min( + 64 * 1024 * 1024, (int) Math.min(Runtime.getRuntime().maxMemory() / 64, Integer.MAX_VALUE)); + } + public int getThriftDefaultBufferSize() { return thriftDefaultBufferSize; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 7e903948c19..6ee056d0685 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -803,10 +803,6 @@ public class IoTDBDescriptor { properties.getProperty( "dn_thrift_max_frame_size", String.valueOf(conf.getThriftMaxFrameSize())))); - if (conf.getThriftMaxFrameSize() < IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2) { - conf.setThriftMaxFrameSize(IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2); - } - conf.setThriftDefaultBufferSize( Integer.parseInt( properties.getProperty( diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index e4d6b56895a..5a32d6a1231 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -533,10 +533,10 @@ dn_rpc_selector_thread_count=1 # Datatype: int dn_rpc_max_concurrent_client_num=1000 -# thrift max frame size, 512MB by default +# thrift max frame size in bytes. When set to 0, use min(64MB, datanode heap memory / 64) # effectiveMode: restart # Datatype: int -dn_thrift_max_frame_size=536870912 +dn_thrift_max_frame_size=0 # thrift init buffer size # effectiveMode: restart diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index 8b4352cb95a..adf72842797 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java @@ -285,7 +285,6 @@ public class IoTDBConstant { public static final String MQTT_MAX_MESSAGE_SIZE = "mqtt_max_message_size"; // thrift - public static final int LEFT_SIZE_IN_REQUEST = 4 * 1024 * 1024; public static final int DEFAULT_FETCH_SIZE = 5000; public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
