This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch test-new-record-rpc-format in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fa719c566bfbc1cb3943b6b637f8f542320596d8 Author: Liu Xuxin <[email protected]> AuthorDate: Mon Nov 13 18:17:15 2023 +0800 temp --- .../main/java/org/apache/iotdb/SessionExample.java | 26 +++++++++++----------- .../org/apache/iotdb/rpc/RpcTransportFactory.java | 2 +- .../apache/iotdb/session/SessionConnection.java | 7 ++++++ .../org/apache/iotdb/db/service/RPCService.java | 2 +- .../service/AbstractThriftServiceThread.java | 8 +++++++ 5 files changed, 30 insertions(+), 15 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index d5f40452ea8..dc1d1f446bb 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -74,7 +74,7 @@ public class SessionExample { .password("root") .version(Version.V_1_0) .build(); - session.open(false); + session.open(true); // set session fetchSize session.setFetchSize(10000); @@ -110,18 +110,18 @@ public class SessionExample { // deleteTimeseries(); // setTimeout(); - sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); - sessionEnableRedirect.setEnableQueryRedirection(true); - sessionEnableRedirect.open(false); - - // set session fetchSize - sessionEnableRedirect.setFetchSize(10000); - - fastLastDataQueryForOneDevice(); - insertRecord4Redirect(); - query4Redirect(); - sessionEnableRedirect.close(); - session.close(); +// sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); +// sessionEnableRedirect.setEnableQueryRedirection(true); +// sessionEnableRedirect.open(false); +// +// // set session fetchSize +// sessionEnableRedirect.setFetchSize(10000); +// +// fastLastDataQueryForOneDevice(); +// insertRecord4Redirect(); +// query4Redirect(); +// sessionEnableRedirect.close(); +// session.close(); } private static void createAndDropContinuousQueries() diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java index 191f0a42f95..c1d799f2aad 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java @@ -42,7 +42,7 @@ public class RpcTransportFactory extends TTransportFactory { private final TTransportFactory inner; - private RpcTransportFactory(TTransportFactory inner) { + public RpcTransportFactory(TTransportFactory inner) { this.inner = inner; } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 6cb618e8fa9..68e221807a6 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -162,6 +162,13 @@ public class SessionConnection { session.connectionTimeoutInMs, trustStore, trustStorePwd); + } else if (session.enableRPCCompression) { + RpcTransportFactory.setUseSnappy(true); + RpcTransportFactory.reInit(); + transport = + RpcTransportFactory.INSTANCE.getTransport( + // as there is a try-catch already, we do not need to use TSocket.wrap + endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs); } else { transport = RpcTransportFactory.INSTANCE.getTransport( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java index af747cb1dd0..b5c6e3c10e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java @@ -85,7 +85,7 @@ public class RPCService extends ThriftService implements RPCServiceMBean { config.getRpcMaxConcurrentClientNum(), config.getThriftServerAwaitTimeForStopService(), new RPCServiceThriftHandler(impl), - IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()); + true); } } catch (RPCServiceException e) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java index 12eea072f32..eb89232608f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java @@ -24,6 +24,9 @@ import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.runtime.RPCServiceException; +import org.apache.iotdb.rpc.RpcTransportFactory; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TimeoutChangeableTSnappyFramedTransport; import org.apache.thrift.TBaseAsyncProcessor; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; @@ -208,6 +211,11 @@ public abstract class AbstractThriftServiceThread extends Thread { serverTransport = openTransport(bindAddress, port); TThreadPoolServer.Args poolArgs = initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond); + if (compress) { + poolArgs.transportFactory(new RpcTransportFactory( + new TimeoutChangeableTSnappyFramedTransport.Factory( + RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, RpcUtils.THRIFT_FRAME_MAX_SIZE))); + } poolServer = new TThreadPoolServer(poolArgs); poolServer.setServerEventHandler(serverEventHandler); } catch (TTransportException e) {
