This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch compressed-wal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3b2a01f80852ef2cdc386082f5bcf97e3f120b33
Author: Liu Xuxin <[email protected]>
AuthorDate: Mon Nov 13 18:17:15 2023 +0800

    temp
---
 .../main/java/org/apache/iotdb/SessionExample.java | 26 +++++++++++-----------
 .../org/apache/iotdb/rpc/RpcTransportFactory.java  |  2 +-
 .../apache/iotdb/session/SessionConnection.java    |  7 ++++++
 .../org/apache/iotdb/db/service/RPCService.java    |  2 +-
 .../service/AbstractThriftServiceThread.java       |  8 +++++++
 5 files changed, 30 insertions(+), 15 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index d5f40452ea8..dc1d1f446bb 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -74,7 +74,7 @@ public class SessionExample {
             .password("root")
             .version(Version.V_1_0)
             .build();
-    session.open(false);
+    session.open(true);
 
     // set session fetchSize
     session.setFetchSize(10000);
@@ -110,18 +110,18 @@ public class SessionExample {
     //    deleteTimeseries();
     //    setTimeout();
 
-    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
-    sessionEnableRedirect.setEnableQueryRedirection(true);
-    sessionEnableRedirect.open(false);
-
-    // set session fetchSize
-    sessionEnableRedirect.setFetchSize(10000);
-
-    fastLastDataQueryForOneDevice();
-    insertRecord4Redirect();
-    query4Redirect();
-    sessionEnableRedirect.close();
-    session.close();
+//    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
+//    sessionEnableRedirect.setEnableQueryRedirection(true);
+//    sessionEnableRedirect.open(false);
+//
+//    // set session fetchSize
+//    sessionEnableRedirect.setFetchSize(10000);
+//
+//    fastLastDataQueryForOneDevice();
+//    insertRecord4Redirect();
+//    query4Redirect();
+//    sessionEnableRedirect.close();
+//    session.close();
   }
 
   private static void createAndDropContinuousQueries()
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
index 191f0a42f95..c1d799f2aad 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
@@ -42,7 +42,7 @@ public class RpcTransportFactory extends TTransportFactory {
 
   private final TTransportFactory inner;
 
-  private RpcTransportFactory(TTransportFactory inner) {
+  public RpcTransportFactory(TTransportFactory inner) {
     this.inner = inner;
   }
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 6cb618e8fa9..68e221807a6 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -162,6 +162,13 @@ public class SessionConnection {
                 session.connectionTimeoutInMs,
                 trustStore,
                 trustStorePwd);
+      } else if (session.enableRPCCompression) {
+        RpcTransportFactory.setUseSnappy(true);
+        RpcTransportFactory.reInit();
+        transport =
+            RpcTransportFactory.INSTANCE.getTransport(
+                // as there is a try-catch already, we do not need to use 
TSocket.wrap
+                endPoint.getIp(), endPoint.getPort(), 
session.connectionTimeoutInMs);
       } else {
         transport =
             RpcTransportFactory.INSTANCE.getTransport(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java
index af747cb1dd0..b5c6e3c10e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -85,7 +85,7 @@ public class RPCService extends ThriftService implements 
RPCServiceMBean {
                 config.getRpcMaxConcurrentClientNum(),
                 config.getThriftServerAwaitTimeForStopService(),
                 new RPCServiceThriftHandler(impl),
-                
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+                true);
       }
 
     } catch (RPCServiceException e) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
index 12eea072f32..eb89232608f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
@@ -24,6 +24,9 @@ import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
 
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TimeoutChangeableTSnappyFramedTransport;
 import org.apache.thrift.TBaseAsyncProcessor;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -208,6 +211,11 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
       serverTransport = openTransport(bindAddress, port);
       TThreadPoolServer.Args poolArgs =
           initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, 
timeoutSecond);
+      if (compress) {
+        poolArgs.transportFactory(new RpcTransportFactory(
+            new TimeoutChangeableTSnappyFramedTransport.Factory(
+                RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, 
RpcUtils.THRIFT_FRAME_MAX_SIZE)));
+      }
       poolServer = new TThreadPoolServer(poolArgs);
       poolServer.setServerEventHandler(serverEventHandler);
     } catch (TTransportException e) {

Reply via email to