This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch test-new-record-rpc-format in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7ecffb4b10c95a16167e58abdca8a6093dc99376 Author: Liu Xuxin <[email protected]> AuthorDate: Wed Nov 15 15:40:37 2023 +0800 temp --- .../main/java/org/apache/iotdb/SessionExample.java | 24 +++++++++++----------- .../resources/conf/iotdb-datanode.properties | 2 ++ .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++++++++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 ++++++++ .../org/apache/iotdb/db/service/RPCService.java | 2 +- .../service/AbstractThriftServiceThread.java | 9 ++++---- 6 files changed, 38 insertions(+), 17 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index dc1d1f446bb..768f276dbad 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -110,18 +110,18 @@ public class SessionExample { // deleteTimeseries(); // setTimeout(); -// sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); -// sessionEnableRedirect.setEnableQueryRedirection(true); -// sessionEnableRedirect.open(false); -// -// // set session fetchSize -// sessionEnableRedirect.setFetchSize(10000); -// -// fastLastDataQueryForOneDevice(); -// insertRecord4Redirect(); -// query4Redirect(); -// sessionEnableRedirect.close(); -// session.close(); + // sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); + // sessionEnableRedirect.setEnableQueryRedirection(true); + // sessionEnableRedirect.open(false); + // + // // set session fetchSize + // sessionEnableRedirect.setFetchSize(10000); + // + // fastLastDataQueryForOneDevice(); + // insertRecord4Redirect(); + // query4Redirect(); + // sessionEnableRedirect.close(); + // session.close(); } private static void createAndDropContinuousQueries() diff --git a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties index e6ee742796e..60206138c5a 100644 --- a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties +++ b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties @@ -107,6 +107,8 @@ dn_seed_config_node=127.0.0.1:10710 # this feature is under development, set this as false before it is done. # dn_rpc_advanced_compression_enable=false +# dn_data_transfer_compression_enable=false + # Datatype: int # dn_rpc_selector_thread_count=1 diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 854401d7b5e..eb1c229a7b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -120,6 +120,8 @@ public class IoTDBConfig { /** whether to use Snappy compression before sending data through the network */ private boolean rpcAdvancedCompressionEnable = false; + private boolean dataTransportCompressionEnable = false; + /** Port which the JDBC server listens to. */ private int rpcPort = 6667; @@ -2589,6 +2591,14 @@ public class IoTDBConfig { RpcTransportFactory.setUseSnappy(this.rpcAdvancedCompressionEnable); } + public boolean isDataTransportCompressionEnable() { + return dataTransportCompressionEnable; + } + + public void setDataTransportCompressionEnable(boolean dataTransportCompressionEnable) { + this.dataTransportCompressionEnable = dataTransportCompressionEnable; + } + public int getMlogBufferSize() { return mlogBufferSize; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index e4c5c243598..76fac28fe61 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -234,6 +234,14 @@ public class IoTDBDescriptor { Boolean.toString(conf.isRpcAdvancedCompressionEnable())) .trim())); + conf.setDataTransportCompressionEnable( + Boolean.parseBoolean( + properties + .getProperty( + "dn_data_transfer_compression_enable", + Boolean.toString(conf.isDataTransportCompressionEnable())) + .trim())); + conf.setConnectionTimeoutInMS( Integer.parseInt( properties diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java index b5c6e3c10e2..0efc54c6eb5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java @@ -85,7 +85,7 @@ public class RPCService extends ThriftService implements RPCServiceMBean { config.getRpcMaxConcurrentClientNum(), config.getThriftServerAwaitTimeForStopService(), new RPCServiceThriftHandler(impl), - true); + config.isDataTransportCompressionEnable()); } } catch (RPCServiceException e) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java index eb89232608f..794a7a2d72f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java @@ -23,10 +23,10 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.runtime.RPCServiceException; - import org.apache.iotdb.rpc.RpcTransportFactory; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TimeoutChangeableTSnappyFramedTransport; + import org.apache.thrift.TBaseAsyncProcessor; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; @@ -212,9 +212,10 @@ public abstract class AbstractThriftServiceThread extends Thread { TThreadPoolServer.Args poolArgs = initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond); if (compress) { - poolArgs.transportFactory(new RpcTransportFactory( - new TimeoutChangeableTSnappyFramedTransport.Factory( - RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, RpcUtils.THRIFT_FRAME_MAX_SIZE))); + poolArgs.transportFactory( + new RpcTransportFactory( + new TimeoutChangeableTSnappyFramedTransport.Factory( + RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, RpcUtils.THRIFT_FRAME_MAX_SIZE))); } poolServer = new TThreadPoolServer(poolArgs); poolServer.setServerEventHandler(serverEventHandler);
