This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b056644f28 Thrift zero-copy optimization for TElasticFramedTransport 
(#12050)
5b056644f28 is described below

commit 5b056644f283b0ce07b025746895436ac0431c4f
Author: Mrquan <[email protected]>
AuthorDate: Fri Mar 15 18:03:46 2024 +0800

    Thrift zero-copy optimization for TElasticFramedTransport (#12050)
---
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     | 10 ++--
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 65 ----------------------
 ...rtFactory.java => BaseRpcTransportFactory.java} | 30 +++-------
 .../iotdb/rpc/DeepCopyRpcTransportFactory.java     | 46 +++++++++++++++
 .../rpc/TCompressedElasticFramedTransport.java     |  7 ++-
 .../apache/iotdb/rpc/TElasticFramedTransport.java  | 47 +++++++++++++---
 .../iotdb/rpc/TSnappyElasticFramedTransport.java   | 22 +++++---
 .../rpc/TimeoutChangeableTFastFramedTransport.java | 13 +++--
 .../TimeoutChangeableTSnappyFramedTransport.java   | 12 ++--
 .../iotdb/rpc/ZeroCopyRpcTransportFactory.java     | 46 +++++++++++++++
 .../apache/iotdb/session/SessionConnection.java    | 10 ++--
 .../org/apache/iotdb/session/ThriftConnection.java | 10 ++--
 .../service/thrift/ConfigNodeRPCService.java       |  4 +-
 .../iot/client/SyncIoTConsensusServiceClient.java  |  4 +-
 .../iot/service/IoTConsensusRPCService.java        |  4 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  9 +--
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 +-
 .../legacy/IoTDBLegacyPipeReceiverAgent.java       |  4 +-
 .../iotdb/db/protocol/client/ConfigNodeClient.java |  4 +-
 .../execution/exchange/MPPDataExchangeService.java |  4 +-
 .../db/service/DataNodeInternalRPCService.java     |  4 +-
 .../org/apache/iotdb/db/service/RPCService.java    |  8 ++-
 .../client/sync/SyncConfigNodeIServiceClient.java  |  4 +-
 .../sync/SyncDataNodeInternalServiceClient.java    |  4 +-
 .../SyncDataNodeMPPDataExchangeServiceClient.java  |  4 +-
 .../pipe/connector/client/IoTDBSyncClient.java     |  6 +-
 .../service/AbstractThriftServiceThread.java       | 18 ++++--
 .../iotdb/commons/service/ThriftServiceThread.java | 25 ++++-----
 .../client/mock/MockInternalRPCService.java        |  4 +-
 29 files changed, 256 insertions(+), 178 deletions(-)

diff --git 
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java 
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 39cd2f54012..9f8142fb1b3 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.jdbc;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
@@ -465,12 +465,12 @@ public class IoTDBConnection implements Connection {
   }
 
   private void openTransport() throws TTransportException {
-    
RpcTransportFactory.setDefaultBufferCapacity(params.getThriftDefaultBufferSize());
-    RpcTransportFactory.setThriftMaxFrameSize(params.getThriftMaxFrameSize());
+    
DeepCopyRpcTransportFactory.setDefaultBufferCapacity(params.getThriftDefaultBufferSize());
+    
DeepCopyRpcTransportFactory.setThriftMaxFrameSize(params.getThriftMaxFrameSize());
 
     if (params.isUseSSL()) {
       transport =
-          RpcTransportFactory.INSTANCE.getTransport(
+          DeepCopyRpcTransportFactory.INSTANCE.getTransport(
               params.getHost(),
               params.getPort(),
               getNetworkTimeout(),
@@ -478,7 +478,7 @@ public class IoTDBConnection implements Connection {
               params.getTrustStorePwd());
     } else {
       transport =
-          RpcTransportFactory.INSTANCE.getTransport(
+          DeepCopyRpcTransportFactory.INSTANCE.getTransport(
               params.getHost(), params.getPort(), getNetworkTimeout());
     }
     if (!transport.isOpen()) {
diff --git 
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java 
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 3ff05033b98..1c95abed37f 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -29,13 +29,9 @@ import 
org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.thrift.TException;
 
-import java.nio.ByteBuffer;
 import java.sql.BatchUpdateException;
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -46,8 +42,6 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
 
 public class IoTDBStatement implements Statement {
 
@@ -269,7 +263,6 @@ public class IoTDBStatement implements Statement {
       throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
     }
 
-    deepCopyResp(execResp);
     if (execResp.isSetColumns()) {
       queryId = execResp.getQueryId();
       if (execResp.queryResult == null) {
@@ -418,9 +411,6 @@ public class IoTDBStatement implements Statement {
       throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
     }
 
-    // Because different result sets share the TTransport and buffer, if the 
previous result set was
-    // not consumed timely, the byte buffer will be overwritten by the 
incoming result set
-    deepCopyResp(execResp);
     BitSet aliasColumn = null;
     if (execResp.getAliasColumns() != null && 
!execResp.getAliasColumns().isEmpty()) {
       aliasColumn = listToBitSet(execResp.getAliasColumns());
@@ -475,61 +465,6 @@ public class IoTDBStatement implements Statement {
     return BitSet.valueOf(byteAlias);
   }
 
-  private void deepCopyResp(TSExecuteStatementResp queryRes) {
-    final TSQueryDataSet tsQueryDataSet = queryRes.getQueryDataSet();
-    final TSQueryNonAlignDataSet nonAlignDataSet = 
queryRes.getNonAlignQueryDataSet();
-
-    if (Objects.nonNull(tsQueryDataSet)) {
-      deepCopyTsQueryDataSet(tsQueryDataSet);
-    } else if (Objects.nonNull(nonAlignDataSet)) {
-      deepCopyNonAlignQueryDataSet(nonAlignDataSet);
-    } else {
-      deepCopyQueryResult(queryRes);
-    }
-  }
-
-  private void deepCopyQueryResult(TSExecuteStatementResp queryRes) {
-    List<ByteBuffer> queryResult = queryRes.getQueryResult();
-    if (queryResult == null) {
-      return;
-    }
-    final List<ByteBuffer> queryResultCopy =
-        
queryResult.stream().map(ReadWriteIOUtils::clone).collect(Collectors.toList());
-    queryRes.setQueryResult(queryResultCopy);
-  }
-
-  private void deepCopyNonAlignQueryDataSet(TSQueryNonAlignDataSet 
nonAlignDataSet) {
-    if (Objects.isNull(nonAlignDataSet)) {
-      return;
-    }
-
-    final List<ByteBuffer> valueList =
-        nonAlignDataSet.valueList.stream()
-            .map(ReadWriteIOUtils::clone)
-            .collect(Collectors.toList());
-
-    final List<ByteBuffer> timeList =
-        
nonAlignDataSet.timeList.stream().map(ReadWriteIOUtils::clone).collect(Collectors.toList());
-
-    nonAlignDataSet.setTimeList(timeList);
-    nonAlignDataSet.setValueList(valueList);
-  }
-
-  private void deepCopyTsQueryDataSet(TSQueryDataSet tsQueryDataSet) {
-    final ByteBuffer time = ReadWriteIOUtils.clone(tsQueryDataSet.time);
-    final List<ByteBuffer> valueList =
-        
tsQueryDataSet.valueList.stream().map(ReadWriteIOUtils::clone).collect(Collectors.toList());
-
-    final List<ByteBuffer> bitmapList =
-        tsQueryDataSet.bitmapList.stream()
-            .map(ReadWriteIOUtils::clone)
-            .collect(Collectors.toList());
-
-    tsQueryDataSet.setBitmapList(bitmapList);
-    tsQueryDataSet.setValueList(valueList);
-    tsQueryDataSet.setTime(time);
-  }
-
   @Override
   public int executeUpdate(String sql) throws SQLException {
     checkConnection("execute update");
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/BaseRpcTransportFactory.java
similarity index 79%
rename from 
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
rename to 
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/BaseRpcTransportFactory.java
index 191f0a42f95..61fcf52671f 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/BaseRpcTransportFactory.java
@@ -27,22 +27,17 @@ import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
 @SuppressWarnings("java:S1135") // ignore todos
-public class RpcTransportFactory extends TTransportFactory {
+public class BaseRpcTransportFactory extends TTransportFactory {
 
   // TODO: make it a config
   public static boolean USE_SNAPPY = false;
-  public static RpcTransportFactory INSTANCE;
 
-  private static int thriftDefaultBufferSize = 
RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
-  private static int thriftMaxFrameSize = RpcUtils.THRIFT_FRAME_MAX_SIZE;
+  protected static int thriftDefaultBufferSize = 
RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
+  protected static int thriftMaxFrameSize = RpcUtils.THRIFT_FRAME_MAX_SIZE;
 
-  static {
-    reInit();
-  }
-
-  private final TTransportFactory inner;
+  protected final TTransportFactory inner;
 
-  private RpcTransportFactory(TTransportFactory inner) {
+  protected BaseRpcTransportFactory(TTransportFactory inner) {
     this.inner = inner;
   }
 
@@ -103,21 +98,10 @@ public class RpcTransportFactory extends TTransportFactory 
{
   }
 
   public static void setDefaultBufferCapacity(int thriftDefaultBufferSize) {
-    RpcTransportFactory.thriftDefaultBufferSize = thriftDefaultBufferSize;
+    BaseRpcTransportFactory.thriftDefaultBufferSize = thriftDefaultBufferSize;
   }
 
   public static void setThriftMaxFrameSize(int thriftMaxFrameSize) {
-    RpcTransportFactory.thriftMaxFrameSize = thriftMaxFrameSize;
-  }
-
-  public static void reInit() {
-    INSTANCE =
-        USE_SNAPPY
-            ? new RpcTransportFactory(
-                new TimeoutChangeableTSnappyFramedTransport.Factory(
-                    thriftDefaultBufferSize, thriftMaxFrameSize))
-            : new RpcTransportFactory(
-                new TimeoutChangeableTFastFramedTransport.Factory(
-                    thriftDefaultBufferSize, thriftMaxFrameSize));
+    BaseRpcTransportFactory.thriftMaxFrameSize = thriftMaxFrameSize;
   }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/DeepCopyRpcTransportFactory.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/DeepCopyRpcTransportFactory.java
new file mode 100644
index 00000000000..83f41275f2e
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/DeepCopyRpcTransportFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.thrift.transport.TTransportFactory;
+
+public class DeepCopyRpcTransportFactory extends BaseRpcTransportFactory {
+
+  public static DeepCopyRpcTransportFactory INSTANCE;
+
+  static {
+    reInit();
+  }
+
+  private DeepCopyRpcTransportFactory(TTransportFactory inner) {
+    super(inner);
+  }
+
+  public static void reInit() {
+    INSTANCE =
+        USE_SNAPPY
+            ? new DeepCopyRpcTransportFactory(
+                new TimeoutChangeableTSnappyFramedTransport.Factory(
+                    thriftDefaultBufferSize, thriftMaxFrameSize, true))
+            : new DeepCopyRpcTransportFactory(
+                new TElasticFramedTransport.Factory(
+                    thriftDefaultBufferSize, thriftMaxFrameSize, true));
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index e8cb88f2c86..a3b4f38064a 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -31,8 +31,11 @@ public abstract class TCompressedElasticFramedTransport 
extends TElasticFramedTr
   private AutoScalingBufferReadTransport readCompressBuffer;
 
   protected TCompressedElasticFramedTransport(
-      TTransport underlying, int thriftDefaultBufferSize, int 
thriftMaxFrameSize) {
-    super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize);
+      TTransport underlying,
+      int thriftDefaultBufferSize,
+      int thriftMaxFrameSize,
+      boolean copyBinary) {
+    super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
     writeCompressBuffer = new 
AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
     readCompressBuffer = new 
AutoScalingBufferReadTransport(thriftDefaultBufferSize);
   }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 084a3240d84..1075e1de865 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -46,30 +46,38 @@ public class TElasticFramedTransport extends TTransport {
      */
     protected final int thriftDefaultBufferSize;
 
+    /**
+     * When copyBinary flag is true, the transport will copy the binary data 
from the underlying.
+     * This is a protection for the underlying buffer to be reused by the 
caller protocol.
+     */
+    protected final boolean copyBinary;
+
     public Factory() {
-      this(RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, 
RpcUtils.THRIFT_FRAME_MAX_SIZE);
+      this(RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, 
RpcUtils.THRIFT_FRAME_MAX_SIZE, true);
     }
 
-    public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize) {
+    public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize, 
boolean copyBinary) {
       this.thriftDefaultBufferSize = thriftDefaultBufferSize;
       this.thriftMaxFrameSize = thriftMaxFrameSize;
+      this.copyBinary = copyBinary;
     }
 
     @Override
     public TTransport getTransport(TTransport trans) {
-      return new TElasticFramedTransport(trans, thriftDefaultBufferSize, 
thriftMaxFrameSize);
+      return new TElasticFramedTransport(
+          trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
     }
   }
 
-  public TElasticFramedTransport(TTransport underlying) {
-    this(underlying, RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, 
RpcUtils.THRIFT_FRAME_MAX_SIZE);
-  }
-
   public TElasticFramedTransport(
-      TTransport underlying, int thriftDefaultBufferSize, int 
thriftMaxFrameSize) {
+      TTransport underlying,
+      int thriftDefaultBufferSize,
+      int thriftMaxFrameSize,
+      boolean copyBinary) {
     this.underlying = underlying;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
     this.thriftMaxFrameSize = thriftMaxFrameSize;
+    this.copyBinary = copyBinary;
     readBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize);
     writeBuffer = new AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
   }
@@ -81,6 +89,7 @@ public class TElasticFramedTransport extends TTransport {
   protected AutoScalingBufferReadTransport readBuffer;
   protected AutoScalingBufferWriteTransport writeBuffer;
   protected final byte[] i32buf = new byte[4];
+  private final boolean copyBinary;
 
   @Override
   public boolean isOpen() {
@@ -165,4 +174,26 @@ public class TElasticFramedTransport extends TTransport {
   public TTransport getSocket() {
     return underlying;
   }
+
+  @Override
+  public int getBytesRemainingInBuffer() {
+    // return -1 can make the caller protocol to copy binary data from the 
underlying transport.
+    if (copyBinary) return -1;
+    return readBuffer.getBytesRemainingInBuffer();
+  }
+
+  @Override
+  public byte[] getBuffer() {
+    return readBuffer.getBuffer();
+  }
+
+  @Override
+  public int getBufferPosition() {
+    return readBuffer.getBufferPosition();
+  }
+
+  @Override
+  public void consumeBuffer(int len) {
+    readBuffer.consumeBuffer(len);
+  }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index a85ccf0c5b5..79ebc7983e3 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -29,30 +29,34 @@ public class TSnappyElasticFramedTransport extends 
TCompressedElasticFramedTrans
   public static class Factory extends TElasticFramedTransport.Factory {
 
     public Factory() {
-      this(RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, 
RpcUtils.THRIFT_FRAME_MAX_SIZE);
+      this(RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, 
RpcUtils.THRIFT_FRAME_MAX_SIZE, true);
     }
 
-    public Factory(int thriftDefaultBufferSize) {
-      this(thriftDefaultBufferSize, RpcUtils.THRIFT_FRAME_MAX_SIZE);
+    public Factory(int thriftDefaultBufferSize, boolean copyBinary) {
+      this(thriftDefaultBufferSize, RpcUtils.THRIFT_FRAME_MAX_SIZE, 
copyBinary);
     }
 
-    public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize) {
-      super(thriftDefaultBufferSize, thriftMaxFrameSize);
+    public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize, 
boolean copyBinary) {
+      super(thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
     }
 
     @Override
     public TTransport getTransport(TTransport trans) {
-      return new TSnappyElasticFramedTransport(trans, thriftDefaultBufferSize, 
thriftMaxFrameSize);
+      return new TSnappyElasticFramedTransport(
+          trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
     }
   }
 
   public TSnappyElasticFramedTransport(TTransport underlying) {
-    this(underlying, RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, 
RpcUtils.THRIFT_FRAME_MAX_SIZE);
+    this(underlying, RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, 
RpcUtils.THRIFT_FRAME_MAX_SIZE, true);
   }
 
   public TSnappyElasticFramedTransport(
-      TTransport underlying, int thriftDefaultBufferSize, int 
thriftMaxFrameSize) {
-    super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize);
+      TTransport underlying,
+      int thriftDefaultBufferSize,
+      int thriftMaxFrameSize,
+      boolean copyBinary) {
+    super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
   }
 
   @Override
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
index 3c5f54be2ba..77c2bbf6b97 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
@@ -31,8 +31,8 @@ public class TimeoutChangeableTFastFramedTransport extends 
TElasticFramedTranspo
   private final TSocket underlyingSocket;
 
   public TimeoutChangeableTFastFramedTransport(
-      TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize) 
{
-    super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize);
+      TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, 
boolean copyBinary) {
+    super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
     this.underlyingSocket = underlying;
   }
 
@@ -56,19 +56,22 @@ public class TimeoutChangeableTFastFramedTransport extends 
TElasticFramedTranspo
 
     private final int thriftDefaultBufferSize;
     protected final int thriftMaxFrameSize;
+    protected final boolean copyBinary;
 
-    public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize) {
+    public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize, 
boolean copyBinary) {
       this.thriftDefaultBufferSize = thriftDefaultBufferSize;
       this.thriftMaxFrameSize = thriftMaxFrameSize;
+      this.copyBinary = copyBinary;
     }
 
     @Override
     public TTransport getTransport(TTransport trans) {
       if (trans instanceof TSocket) {
         return new TimeoutChangeableTFastFramedTransport(
-            (TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize);
+            (TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize, 
copyBinary);
       } else {
-        return new TElasticFramedTransport(trans, thriftDefaultBufferSize, 
thriftMaxFrameSize);
+        return new TElasticFramedTransport(
+            trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
       }
     }
   }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
index 590005b8b5e..168f52662aa 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
@@ -31,8 +31,8 @@ public class TimeoutChangeableTSnappyFramedTransport extends 
TSnappyElasticFrame
   private final TSocket underlyingSocket;
 
   public TimeoutChangeableTSnappyFramedTransport(
-      TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize) 
{
-    super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize);
+      TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, 
boolean copyBinary) {
+    super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
     this.underlyingSocket = underlying;
   }
 
@@ -56,20 +56,22 @@ public class TimeoutChangeableTSnappyFramedTransport 
extends TSnappyElasticFrame
 
     private final int thriftDefaultBufferSize;
     protected final int thriftMaxFrameSize;
+    protected final boolean copyBinary;
 
-    public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize) {
+    public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize, 
boolean copyBinary) {
       this.thriftDefaultBufferSize = thriftDefaultBufferSize;
       this.thriftMaxFrameSize = thriftMaxFrameSize;
+      this.copyBinary = copyBinary;
     }
 
     @Override
     public TTransport getTransport(TTransport trans) {
       if (trans instanceof TSocket) {
         return new TimeoutChangeableTSnappyFramedTransport(
-            (TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize);
+            (TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize, 
copyBinary);
       } else {
         return new TSnappyElasticFramedTransport(
-            trans, thriftDefaultBufferSize, thriftMaxFrameSize);
+            trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
       }
     }
   }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/ZeroCopyRpcTransportFactory.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/ZeroCopyRpcTransportFactory.java
new file mode 100644
index 00000000000..69ad7068cc5
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/ZeroCopyRpcTransportFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.thrift.transport.TTransportFactory;
+
+public class ZeroCopyRpcTransportFactory extends BaseRpcTransportFactory {
+
+  public static ZeroCopyRpcTransportFactory INSTANCE;
+
+  static {
+    reInit();
+  }
+
+  private ZeroCopyRpcTransportFactory(TTransportFactory inner) {
+    super(inner);
+  }
+
+  public static void reInit() {
+    INSTANCE =
+        USE_SNAPPY
+            ? new ZeroCopyRpcTransportFactory(
+                new TimeoutChangeableTSnappyFramedTransport.Factory(
+                    thriftDefaultBufferSize, thriftMaxFrameSize, false))
+            : new ZeroCopyRpcTransportFactory(
+                new TElasticFramedTransport.Factory(
+                    thriftDefaultBufferSize, thriftMaxFrameSize, false));
+  }
+}
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 6cb618e8fa9..fb701a65277 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RedirectException;
-import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
@@ -151,12 +151,12 @@ public class SessionConnection {
 
   private void init(TEndPoint endPoint, boolean useSSL, String trustStore, 
String trustStorePwd)
       throws IoTDBConnectionException {
-    
RpcTransportFactory.setDefaultBufferCapacity(session.thriftDefaultBufferSize);
-    RpcTransportFactory.setThriftMaxFrameSize(session.thriftMaxFrameSize);
+    
DeepCopyRpcTransportFactory.setDefaultBufferCapacity(session.thriftDefaultBufferSize);
+    
DeepCopyRpcTransportFactory.setThriftMaxFrameSize(session.thriftMaxFrameSize);
     try {
       if (useSSL) {
         transport =
-            RpcTransportFactory.INSTANCE.getTransport(
+            DeepCopyRpcTransportFactory.INSTANCE.getTransport(
                 endPoint.getIp(),
                 endPoint.getPort(),
                 session.connectionTimeoutInMs,
@@ -164,7 +164,7 @@ public class SessionConnection {
                 trustStorePwd);
       } else {
         transport =
-            RpcTransportFactory.INSTANCE.getTransport(
+            DeepCopyRpcTransportFactory.INSTANCE.getTransport(
                 // as there is a try-catch already, we do not need to use 
TSocket.wrap
                 endPoint.getIp(), endPoint.getPort(), 
session.connectionTimeoutInMs);
       }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
index 36d2bc81213..5bfbe43ccfc 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.session;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
@@ -80,12 +80,12 @@ public class ThriftConnection {
       ZoneId zoneId,
       String version)
       throws IoTDBConnectionException {
-    RpcTransportFactory.setDefaultBufferCapacity(thriftDefaultBufferSize);
-    RpcTransportFactory.setThriftMaxFrameSize(thriftMaxFrameSize);
+    
DeepCopyRpcTransportFactory.setDefaultBufferCapacity(thriftDefaultBufferSize);
+    DeepCopyRpcTransportFactory.setThriftMaxFrameSize(thriftMaxFrameSize);
     try {
       if (useSSL) {
         transport =
-            RpcTransportFactory.INSTANCE.getTransport(
+            DeepCopyRpcTransportFactory.INSTANCE.getTransport(
                 endPoint.getIp(),
                 endPoint.getPort(),
                 connectionTimeoutInMs,
@@ -93,7 +93,7 @@ public class ThriftConnection {
                 trustStorePwd);
       } else {
         transport =
-            RpcTransportFactory.INSTANCE.getTransport(
+            DeepCopyRpcTransportFactory.INSTANCE.getTransport(
                 // as there is a try-catch already, we do not need to use 
TSocket.wrap
                 endPoint.getIp(), endPoint.getPort(), connectionTimeoutInMs);
       }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
index b73ce6f8b08..77134444030 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 
 /** ConfigNodeRPCServer exposes the interface that interacts with the DataNode 
*/
 public class ConfigNodeRPCService extends ThriftService implements 
ConfigNodeRPCServiceMBean {
@@ -69,7 +70,8 @@ public class ConfigNodeRPCService extends ThriftService 
implements ConfigNodeRPC
               configConf.getCnRpcMaxConcurrentClientNum(),
               configConf.getThriftServerAwaitTimeForStopService(),
               new ConfigNodeRPCServiceHandler(),
-              commonConfig.isRpcThriftCompressionEnabled());
+              commonConfig.isRpcThriftCompressionEnabled(),
+              DeepCopyRpcTransportFactory.INSTANCE);
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
index e3815d3c3e5..da120fd56d7 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
@@ -26,7 +26,7 @@ import 
org.apache.iotdb.commons.client.factory.ThriftClientFactory;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
 import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
 
 import org.apache.commons.pool2.PooledObject;
@@ -50,7 +50,7 @@ public class SyncIoTConsensusServiceClient extends 
IoTConsensusIService.Client
         property
             .getProtocolFactory()
             .getProtocol(
-                RpcTransportFactory.INSTANCE.getTransport(
+                DeepCopyRpcTransportFactory.INSTANCE.getTransport(
                     new TSocket(
                         TConfigurationConst.defaultTConfiguration,
                         endpoint.getIp(),
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
index 567b23cefc6..7be6dbefc99 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.service.ThriftService;
 import org.apache.iotdb.commons.service.ThriftServiceThread;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
+import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
 
 import org.apache.thrift.TBaseAsyncProcessor;
 import org.slf4j.Logger;
@@ -85,7 +86,8 @@ public class IoTConsensusRPCService extends ThriftService 
implements IoTConsensu
               config.getRpc().isRpcThriftCompressionEnabled(),
               config.getRpc().getConnectionTimeoutInMs(),
               config.getRpc().getThriftMaxFrameSize(),
-              ThriftServiceThread.ServerType.SELECTOR);
+              ThriftServiceThread.ServerType.SELECTOR,
+              ZeroCopyRpcTransportFactory.INSTANCE);
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 682d3d00022..6be4d80e36d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -41,8 +41,9 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.TimeIndexLe
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
 import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm;
 import org.apache.iotdb.metrics.metricsets.system.SystemMetrics;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.BaseRpcTransportFactory;
 import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -2516,7 +2517,7 @@ public class IoTDBConfig {
 
   public void setThriftMaxFrameSize(int thriftMaxFrameSize) {
     this.thriftMaxFrameSize = thriftMaxFrameSize;
-    RpcTransportFactory.setThriftMaxFrameSize(this.thriftMaxFrameSize);
+    BaseRpcTransportFactory.setThriftMaxFrameSize(this.thriftMaxFrameSize);
   }
 
   public int getThriftDefaultBufferSize() {
@@ -2525,7 +2526,7 @@ public class IoTDBConfig {
 
   public void setThriftDefaultBufferSize(int thriftDefaultBufferSize) {
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
-    RpcTransportFactory.setDefaultBufferCapacity(this.thriftDefaultBufferSize);
+    
BaseRpcTransportFactory.setDefaultBufferCapacity(this.thriftDefaultBufferSize);
   }
 
   public int getCheckPeriodWhenInsertBlocked() {
@@ -2606,7 +2607,7 @@ public class IoTDBConfig {
 
   public void setRpcAdvancedCompressionEnable(boolean 
rpcAdvancedCompressionEnable) {
     this.rpcAdvancedCompressionEnable = rpcAdvancedCompressionEnable;
-    RpcTransportFactory.setUseSnappy(this.rpcAdvancedCompressionEnable);
+    
ZeroCopyRpcTransportFactory.setUseSnappy(this.rpcAdvancedCompressionEnable);
   }
 
   public int getMlogBufferSize() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ba8a5f41c40..3a3693e3b7b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -53,7 +53,8 @@ import 
org.apache.iotdb.metrics.reporter.iotdb.IoTDBInternalMemoryReporter;
 import org.apache.iotdb.metrics.reporter.iotdb.IoTDBInternalReporter;
 import org.apache.iotdb.metrics.utils.InternalReporterType;
 import org.apache.iotdb.metrics.utils.NodeType;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
+import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -975,7 +976,8 @@ public class IoTDBDescriptor {
     loadTsFileProps(properties);
 
     // make RPCTransportFactory taking effect.
-    RpcTransportFactory.reInit();
+    ZeroCopyRpcTransportFactory.reInit();
+    DeepCopyRpcTransportFactory.reInit();
 
     // UDF
     loadUDFProps(properties);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
index fe0b2ac4073..30481ae9cf2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -180,7 +180,7 @@ public class IoTDBLegacyPipeReceiverAgent {
     // step2. deserialize PipeData
     PipeData pipeData;
     try {
-      int length = buff.capacity();
+      int length = buff.remaining();
       byte[] byteArray = new byte[length];
       buff.get(byteArray);
       pipeData = PipeData.createPipeData(byteArray);
@@ -288,7 +288,7 @@ public class IoTDBLegacyPipeReceiverAgent {
 
     // step3. append file
     try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) 
{
-      int length = buff.capacity();
+      int length = buff.remaining();
       randomAccessFile.seek(startIndex);
       byte[] byteArray = new byte[length];
       buff.get(byteArray);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index ddaf6dd75ff..49521a7132f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -129,7 +129,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.pool2.PooledObject;
@@ -195,7 +195,7 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
   public void connect(TEndPoint endpoint) throws TException {
     try {
       transport =
-          RpcTransportFactory.INSTANCE.getTransport(
+          DeepCopyRpcTransportFactory.INSTANCE.getTransport(
               // As there is a try-catch already, we do not need to use 
TSocket.wrap
               endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs());
       if (!transport.isOpen()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeService.java
index b42e521bfd8..51feec4b805 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeService.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService.Processor;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -97,7 +98,8 @@ public class MPPDataExchangeService extends ThriftService 
implements MPPDataExch
               config.getRpcMaxConcurrentClientNum(),
               config.getThriftServerAwaitTimeForStopService(),
               new MPPDataExchangeServiceThriftHandler(),
-              config.isRpcThriftCompressionEnable());
+              config.isRpcThriftCompressionEnable(),
+              DeepCopyRpcTransportFactory.INSTANCE);
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
index 416dde14982..83e4869531f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.protocol.thrift.handler.InternalServiceThriftHandler;
 import org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl;
 import org.apache.iotdb.db.service.metrics.DataNodeInternalRPCServiceMetrics;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService.Processor;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 
 public class DataNodeInternalRPCService extends ThriftService
     implements DataNodeInternalRPCServiceMBean {
@@ -67,7 +68,8 @@ public class DataNodeInternalRPCService extends ThriftService
               config.getRpcMaxConcurrentClientNum(),
               config.getThriftServerAwaitTimeForStopService(),
               new InternalServiceThriftHandler(),
-              config.isRpcThriftCompressionEnable());
+              config.isRpcThriftCompressionEnable(),
+              DeepCopyRpcTransportFactory.INSTANCE);
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java
index af747cb1dd0..0ddd8688637 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.db.protocol.thrift.ProcessorWithMetrics;
 import org.apache.iotdb.db.protocol.thrift.handler.RPCServiceThriftHandler;
 import org.apache.iotdb.db.protocol.thrift.impl.IClientRPCServiceWithHandler;
 import org.apache.iotdb.db.service.metrics.RPCServiceMetrics;
+import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
 
 import java.lang.reflect.InvocationTargetException;
 
@@ -73,7 +74,8 @@ public class RPCService extends ThriftService implements 
RPCServiceMBean {
                 
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable(),
                 config.getKeyStorePath(),
                 config.getKeyStorePwd(),
-                config.getConnectionTimeoutInMS());
+                config.getConnectionTimeoutInMS(),
+                ZeroCopyRpcTransportFactory.INSTANCE);
       } else {
         thriftServiceThread =
             new ThriftServiceThread(
@@ -85,9 +87,9 @@ public class RPCService extends ThriftService implements 
RPCServiceMBean {
                 config.getRpcMaxConcurrentClientNum(),
                 config.getThriftServerAwaitTimeForStopService(),
                 new RPCServiceThriftHandler(impl),
-                
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+                
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable(),
+                ZeroCopyRpcTransportFactory.INSTANCE);
       }
-
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
index 4e1df10520a..99350ed32f7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
@@ -52,7 +52,7 @@ public class SyncConfigNodeIServiceClient extends 
IConfigNodeRPCService.Client
         property
             .getProtocolFactory()
             .getProtocol(
-                RpcTransportFactory.INSTANCE.getTransport(
+                DeepCopyRpcTransportFactory.INSTANCE.getTransport(
                     new TSocket(
                         TConfigurationConst.defaultTConfiguration,
                         endPoint.getIp(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index 2b90b5b2661..b82968d4f7d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -26,7 +26,7 @@ import 
org.apache.iotdb.commons.client.factory.ThriftClientFactory;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
@@ -53,7 +53,7 @@ public class SyncDataNodeInternalServiceClient extends 
IDataNodeRPCService.Clien
         property
             .getProtocolFactory()
             .getProtocol(
-                RpcTransportFactory.INSTANCE.getTransport(
+                DeepCopyRpcTransportFactory.INSTANCE.getTransport(
                     new TSocket(
                         TConfigurationConst.defaultTConfiguration,
                         endpoint.getIp(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
index 024b7515fc8..592bb1ddccd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
@@ -52,7 +52,7 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends 
MPPDataExchangeSer
         property
             .getProtocolFactory()
             .getProtocol(
-                RpcTransportFactory.INSTANCE.getTransport(
+                DeepCopyRpcTransportFactory.INSTANCE.getTransport(
                     new TSocket(
                         TConfigurationConst.defaultTConfiguration,
                         endpoint.getIp(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
index fd0fc10231f..8bbefd80798 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.commons.pipe.connector.client;
 
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
 
@@ -44,13 +44,13 @@ public class IoTDBSyncClient extends 
IClientRPCService.Client
             .getProtocolFactory()
             .getProtocol(
                 useSSL
-                    ? RpcTransportFactory.INSTANCE.getTransport(
+                    ? DeepCopyRpcTransportFactory.INSTANCE.getTransport(
                         ipAddress,
                         port,
                         property.getConnectionTimeoutMs(),
                         trustStore,
                         trustStorePwd)
-                    : RpcTransportFactory.INSTANCE.getTransport(
+                    : DeepCopyRpcTransportFactory.INSTANCE.getTransport(
                         ipAddress, port, property.getConnectionTimeoutMs())));
     TTransport transport = getInputProtocol().getTransport();
     if (!transport.isOpen()) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
index 12eea072f32..7cdd69d8a8c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
@@ -62,6 +62,8 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
 
   private TProtocolFactory protocolFactory;
 
+  private TTransportFactory transportFactory;
+
   // currently, we can reuse the ProtocolFactory instance.
   private static TCompactProtocol.Factory compactProtocolFactory = new 
TCompactProtocol.Factory();
   private static TBinaryProtocol.Factory binaryProtocolFactory = new 
TBinaryProtocol.Factory();
@@ -70,7 +72,9 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
     protocolFactory = getProtocolFactory(compress);
   }
 
-  public abstract TTransportFactory getTTransportFactory();
+  public TTransportFactory getTTransportFactory() {
+    return transportFactory;
+  }
 
   public static TProtocolFactory getProtocolFactory(boolean compress) {
     if (compress) {
@@ -116,7 +120,9 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
       boolean compress,
       int connectionTimeoutInMS,
       int maxReadBufferBytes,
-      ServerType serverType) {
+      ServerType serverType,
+      TTransportFactory transportFactory) {
+    this.transportFactory = transportFactory;
     initProtocolFactory(compress);
     this.serviceName = serviceName;
     try {
@@ -168,7 +174,9 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
       boolean compress,
       String keyStorePath,
       String keyStorePwd,
-      int clientTimeout) {
+      int clientTimeout,
+      TTransportFactory transportFactory) {
+    this.transportFactory = transportFactory;
     initProtocolFactory(compress);
     this.serviceName = serviceName;
 
@@ -200,7 +208,9 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
       int maxWorkerThreads,
       int timeoutSecond,
       TServerEventHandler serverEventHandler,
-      boolean compress) {
+      boolean compress,
+      TTransportFactory transportFactory) {
+    this.transportFactory = transportFactory;
     initProtocolFactory(compress);
     this.serviceName = serviceName;
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
index ad5db5504e5..77f25ff6ab0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.commons.service;
 
-import org.apache.iotdb.rpc.RpcTransportFactory;
-
 import org.apache.thrift.TBaseAsyncProcessor;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.server.TServerEventHandler;
@@ -44,7 +42,8 @@ public class ThriftServiceThread extends 
AbstractThriftServiceThread {
       boolean compress,
       int connectionTimeoutInMS,
       int maxReadBufferBytes,
-      ServerType serverType) {
+      ServerType serverType,
+      TTransportFactory transportFactory) {
     super(
         processor,
         serviceName,
@@ -59,7 +58,8 @@ public class ThriftServiceThread extends 
AbstractThriftServiceThread {
         compress,
         connectionTimeoutInMS,
         maxReadBufferBytes,
-        serverType);
+        serverType,
+        transportFactory);
   }
 
   /** for synced ThriftServiceThread */
@@ -76,7 +76,8 @@ public class ThriftServiceThread extends 
AbstractThriftServiceThread {
       boolean compress,
       String keyStorePath,
       String keyStorePwd,
-      int clientTimeout) {
+      int clientTimeout,
+      TTransportFactory transportFactory) {
     super(
         processor,
         serviceName,
@@ -89,7 +90,8 @@ public class ThriftServiceThread extends 
AbstractThriftServiceThread {
         compress,
         keyStorePath,
         keyStorePwd,
-        clientTimeout);
+        clientTimeout,
+        transportFactory);
   }
 
   public ThriftServiceThread(
@@ -101,7 +103,8 @@ public class ThriftServiceThread extends 
AbstractThriftServiceThread {
       int maxWorkerThreads,
       int timeoutSecond,
       TServerEventHandler serverEventHandler,
-      boolean compress) {
+      boolean compress,
+      TTransportFactory transportFactory) {
     super(
         processor,
         serviceName,
@@ -111,11 +114,7 @@ public class ThriftServiceThread extends 
AbstractThriftServiceThread {
         maxWorkerThreads,
         timeoutSecond,
         serverEventHandler,
-        compress);
-  }
-
-  @Override
-  public TTransportFactory getTTransportFactory() {
-    return RpcTransportFactory.INSTANCE;
+        compress,
+        transportFactory);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
index 771c6cf840a..386a212436f 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.service.ThriftService;
 import org.apache.iotdb.commons.service.ThriftServiceThread;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 
 import org.apache.thrift.server.TServerEventHandler;
 
@@ -69,7 +70,8 @@ public class MockInternalRPCService extends ThriftService 
implements MockInterna
               65535,
               60,
               mock(TServerEventHandler.class),
-              false);
+              false,
+              DeepCopyRpcTransportFactory.INSTANCE);
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }

Reply via email to