This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5b056644f28 Thrift zero-copy optimization for TElasticFramedTransport
(#12050)
5b056644f28 is described below
commit 5b056644f283b0ce07b025746895436ac0431c4f
Author: Mrquan <[email protected]>
AuthorDate: Fri Mar 15 18:03:46 2024 +0800
Thrift zero-copy optimization for TElasticFramedTransport (#12050)
---
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 10 ++--
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 65 ----------------------
...rtFactory.java => BaseRpcTransportFactory.java} | 30 +++-------
.../iotdb/rpc/DeepCopyRpcTransportFactory.java | 46 +++++++++++++++
.../rpc/TCompressedElasticFramedTransport.java | 7 ++-
.../apache/iotdb/rpc/TElasticFramedTransport.java | 47 +++++++++++++---
.../iotdb/rpc/TSnappyElasticFramedTransport.java | 22 +++++---
.../rpc/TimeoutChangeableTFastFramedTransport.java | 13 +++--
.../TimeoutChangeableTSnappyFramedTransport.java | 12 ++--
.../iotdb/rpc/ZeroCopyRpcTransportFactory.java | 46 +++++++++++++++
.../apache/iotdb/session/SessionConnection.java | 10 ++--
.../org/apache/iotdb/session/ThriftConnection.java | 10 ++--
.../service/thrift/ConfigNodeRPCService.java | 4 +-
.../iot/client/SyncIoTConsensusServiceClient.java | 4 +-
.../iot/service/IoTConsensusRPCService.java | 4 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 9 +--
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +-
.../legacy/IoTDBLegacyPipeReceiverAgent.java | 4 +-
.../iotdb/db/protocol/client/ConfigNodeClient.java | 4 +-
.../execution/exchange/MPPDataExchangeService.java | 4 +-
.../db/service/DataNodeInternalRPCService.java | 4 +-
.../org/apache/iotdb/db/service/RPCService.java | 8 ++-
.../client/sync/SyncConfigNodeIServiceClient.java | 4 +-
.../sync/SyncDataNodeInternalServiceClient.java | 4 +-
.../SyncDataNodeMPPDataExchangeServiceClient.java | 4 +-
.../pipe/connector/client/IoTDBSyncClient.java | 6 +-
.../service/AbstractThriftServiceThread.java | 18 ++++--
.../iotdb/commons/service/ThriftServiceThread.java | 25 ++++-----
.../client/mock/MockInternalRPCService.java | 4 +-
29 files changed, 256 insertions(+), 178 deletions(-)
diff --git
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 39cd2f54012..9f8142fb1b3 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.jdbc;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
@@ -465,12 +465,12 @@ public class IoTDBConnection implements Connection {
}
private void openTransport() throws TTransportException {
-
RpcTransportFactory.setDefaultBufferCapacity(params.getThriftDefaultBufferSize());
- RpcTransportFactory.setThriftMaxFrameSize(params.getThriftMaxFrameSize());
+
DeepCopyRpcTransportFactory.setDefaultBufferCapacity(params.getThriftDefaultBufferSize());
+
DeepCopyRpcTransportFactory.setThriftMaxFrameSize(params.getThriftMaxFrameSize());
if (params.isUseSSL()) {
transport =
- RpcTransportFactory.INSTANCE.getTransport(
+ DeepCopyRpcTransportFactory.INSTANCE.getTransport(
params.getHost(),
params.getPort(),
getNetworkTimeout(),
@@ -478,7 +478,7 @@ public class IoTDBConnection implements Connection {
params.getTrustStorePwd());
} else {
transport =
- RpcTransportFactory.INSTANCE.getTransport(
+ DeepCopyRpcTransportFactory.INSTANCE.getTransport(
params.getHost(), params.getPort(), getNetworkTimeout());
}
if (!transport.isOpen()) {
diff --git
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 3ff05033b98..1c95abed37f 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -29,13 +29,9 @@ import
org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
-import java.nio.ByteBuffer;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -46,8 +42,6 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
public class IoTDBStatement implements Statement {
@@ -269,7 +263,6 @@ public class IoTDBStatement implements Statement {
throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
}
- deepCopyResp(execResp);
if (execResp.isSetColumns()) {
queryId = execResp.getQueryId();
if (execResp.queryResult == null) {
@@ -418,9 +411,6 @@ public class IoTDBStatement implements Statement {
throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
}
- // Because different result sets share the TTransport and buffer, if the
previous result set was
- // not consumed timely, the byte buffer will be overwritten by the
incoming result set
- deepCopyResp(execResp);
BitSet aliasColumn = null;
if (execResp.getAliasColumns() != null &&
!execResp.getAliasColumns().isEmpty()) {
aliasColumn = listToBitSet(execResp.getAliasColumns());
@@ -475,61 +465,6 @@ public class IoTDBStatement implements Statement {
return BitSet.valueOf(byteAlias);
}
- private void deepCopyResp(TSExecuteStatementResp queryRes) {
- final TSQueryDataSet tsQueryDataSet = queryRes.getQueryDataSet();
- final TSQueryNonAlignDataSet nonAlignDataSet =
queryRes.getNonAlignQueryDataSet();
-
- if (Objects.nonNull(tsQueryDataSet)) {
- deepCopyTsQueryDataSet(tsQueryDataSet);
- } else if (Objects.nonNull(nonAlignDataSet)) {
- deepCopyNonAlignQueryDataSet(nonAlignDataSet);
- } else {
- deepCopyQueryResult(queryRes);
- }
- }
-
- private void deepCopyQueryResult(TSExecuteStatementResp queryRes) {
- List<ByteBuffer> queryResult = queryRes.getQueryResult();
- if (queryResult == null) {
- return;
- }
- final List<ByteBuffer> queryResultCopy =
-
queryResult.stream().map(ReadWriteIOUtils::clone).collect(Collectors.toList());
- queryRes.setQueryResult(queryResultCopy);
- }
-
- private void deepCopyNonAlignQueryDataSet(TSQueryNonAlignDataSet
nonAlignDataSet) {
- if (Objects.isNull(nonAlignDataSet)) {
- return;
- }
-
- final List<ByteBuffer> valueList =
- nonAlignDataSet.valueList.stream()
- .map(ReadWriteIOUtils::clone)
- .collect(Collectors.toList());
-
- final List<ByteBuffer> timeList =
-
nonAlignDataSet.timeList.stream().map(ReadWriteIOUtils::clone).collect(Collectors.toList());
-
- nonAlignDataSet.setTimeList(timeList);
- nonAlignDataSet.setValueList(valueList);
- }
-
- private void deepCopyTsQueryDataSet(TSQueryDataSet tsQueryDataSet) {
- final ByteBuffer time = ReadWriteIOUtils.clone(tsQueryDataSet.time);
- final List<ByteBuffer> valueList =
-
tsQueryDataSet.valueList.stream().map(ReadWriteIOUtils::clone).collect(Collectors.toList());
-
- final List<ByteBuffer> bitmapList =
- tsQueryDataSet.bitmapList.stream()
- .map(ReadWriteIOUtils::clone)
- .collect(Collectors.toList());
-
- tsQueryDataSet.setBitmapList(bitmapList);
- tsQueryDataSet.setValueList(valueList);
- tsQueryDataSet.setTime(time);
- }
-
@Override
public int executeUpdate(String sql) throws SQLException {
checkConnection("execute update");
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/BaseRpcTransportFactory.java
similarity index 79%
rename from
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
rename to
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/BaseRpcTransportFactory.java
index 191f0a42f95..61fcf52671f 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/BaseRpcTransportFactory.java
@@ -27,22 +27,17 @@ import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
@SuppressWarnings("java:S1135") // ignore todos
-public class RpcTransportFactory extends TTransportFactory {
+public class BaseRpcTransportFactory extends TTransportFactory {
// TODO: make it a config
public static boolean USE_SNAPPY = false;
- public static RpcTransportFactory INSTANCE;
- private static int thriftDefaultBufferSize =
RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
- private static int thriftMaxFrameSize = RpcUtils.THRIFT_FRAME_MAX_SIZE;
+ protected static int thriftDefaultBufferSize =
RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
+ protected static int thriftMaxFrameSize = RpcUtils.THRIFT_FRAME_MAX_SIZE;
- static {
- reInit();
- }
-
- private final TTransportFactory inner;
+ protected final TTransportFactory inner;
- private RpcTransportFactory(TTransportFactory inner) {
+ protected BaseRpcTransportFactory(TTransportFactory inner) {
this.inner = inner;
}
@@ -103,21 +98,10 @@ public class RpcTransportFactory extends TTransportFactory
{
}
public static void setDefaultBufferCapacity(int thriftDefaultBufferSize) {
- RpcTransportFactory.thriftDefaultBufferSize = thriftDefaultBufferSize;
+ BaseRpcTransportFactory.thriftDefaultBufferSize = thriftDefaultBufferSize;
}
public static void setThriftMaxFrameSize(int thriftMaxFrameSize) {
- RpcTransportFactory.thriftMaxFrameSize = thriftMaxFrameSize;
- }
-
- public static void reInit() {
- INSTANCE =
- USE_SNAPPY
- ? new RpcTransportFactory(
- new TimeoutChangeableTSnappyFramedTransport.Factory(
- thriftDefaultBufferSize, thriftMaxFrameSize))
- : new RpcTransportFactory(
- new TimeoutChangeableTFastFramedTransport.Factory(
- thriftDefaultBufferSize, thriftMaxFrameSize));
+ BaseRpcTransportFactory.thriftMaxFrameSize = thriftMaxFrameSize;
}
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/DeepCopyRpcTransportFactory.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/DeepCopyRpcTransportFactory.java
new file mode 100644
index 00000000000..83f41275f2e
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/DeepCopyRpcTransportFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.thrift.transport.TTransportFactory;
+
+public class DeepCopyRpcTransportFactory extends BaseRpcTransportFactory {
+
+ public static DeepCopyRpcTransportFactory INSTANCE;
+
+ static {
+ reInit();
+ }
+
+ private DeepCopyRpcTransportFactory(TTransportFactory inner) {
+ super(inner);
+ }
+
+ public static void reInit() {
+ INSTANCE =
+ USE_SNAPPY
+ ? new DeepCopyRpcTransportFactory(
+ new TimeoutChangeableTSnappyFramedTransport.Factory(
+ thriftDefaultBufferSize, thriftMaxFrameSize, true))
+ : new DeepCopyRpcTransportFactory(
+ new TElasticFramedTransport.Factory(
+ thriftDefaultBufferSize, thriftMaxFrameSize, true));
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index e8cb88f2c86..a3b4f38064a 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -31,8 +31,11 @@ public abstract class TCompressedElasticFramedTransport
extends TElasticFramedTr
private AutoScalingBufferReadTransport readCompressBuffer;
protected TCompressedElasticFramedTransport(
- TTransport underlying, int thriftDefaultBufferSize, int
thriftMaxFrameSize) {
- super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize);
+ TTransport underlying,
+ int thriftDefaultBufferSize,
+ int thriftMaxFrameSize,
+ boolean copyBinary) {
+ super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
writeCompressBuffer = new
AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
readCompressBuffer = new
AutoScalingBufferReadTransport(thriftDefaultBufferSize);
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 084a3240d84..1075e1de865 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -46,30 +46,38 @@ public class TElasticFramedTransport extends TTransport {
*/
protected final int thriftDefaultBufferSize;
+ /**
+ * When copyBinary flag is true, the transport will copy the binary data
from the underlying.
+ * This is a protection for the underlying buffer to be reused by the
caller protocol.
+ */
+ protected final boolean copyBinary;
+
public Factory() {
- this(RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY,
RpcUtils.THRIFT_FRAME_MAX_SIZE);
+ this(RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY,
RpcUtils.THRIFT_FRAME_MAX_SIZE, true);
}
- public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize) {
+ public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize,
boolean copyBinary) {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
+ this.copyBinary = copyBinary;
}
@Override
public TTransport getTransport(TTransport trans) {
- return new TElasticFramedTransport(trans, thriftDefaultBufferSize,
thriftMaxFrameSize);
+ return new TElasticFramedTransport(
+ trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
}
}
- public TElasticFramedTransport(TTransport underlying) {
- this(underlying, RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY,
RpcUtils.THRIFT_FRAME_MAX_SIZE);
- }
-
public TElasticFramedTransport(
- TTransport underlying, int thriftDefaultBufferSize, int
thriftMaxFrameSize) {
+ TTransport underlying,
+ int thriftDefaultBufferSize,
+ int thriftMaxFrameSize,
+ boolean copyBinary) {
this.underlying = underlying;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
+ this.copyBinary = copyBinary;
readBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize);
writeBuffer = new AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
}
@@ -81,6 +89,7 @@ public class TElasticFramedTransport extends TTransport {
protected AutoScalingBufferReadTransport readBuffer;
protected AutoScalingBufferWriteTransport writeBuffer;
protected final byte[] i32buf = new byte[4];
+ private final boolean copyBinary;
@Override
public boolean isOpen() {
@@ -165,4 +174,26 @@ public class TElasticFramedTransport extends TTransport {
public TTransport getSocket() {
return underlying;
}
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ // return -1 can make the caller protocol to copy binary data from the
underlying transport.
+ if (copyBinary) return -1;
+ return readBuffer.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return readBuffer.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return readBuffer.getBufferPosition();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ readBuffer.consumeBuffer(len);
+ }
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index a85ccf0c5b5..79ebc7983e3 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -29,30 +29,34 @@ public class TSnappyElasticFramedTransport extends
TCompressedElasticFramedTrans
public static class Factory extends TElasticFramedTransport.Factory {
public Factory() {
- this(RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY,
RpcUtils.THRIFT_FRAME_MAX_SIZE);
+ this(RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY,
RpcUtils.THRIFT_FRAME_MAX_SIZE, true);
}
- public Factory(int thriftDefaultBufferSize) {
- this(thriftDefaultBufferSize, RpcUtils.THRIFT_FRAME_MAX_SIZE);
+ public Factory(int thriftDefaultBufferSize, boolean copyBinary) {
+ this(thriftDefaultBufferSize, RpcUtils.THRIFT_FRAME_MAX_SIZE,
copyBinary);
}
- public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize) {
- super(thriftDefaultBufferSize, thriftMaxFrameSize);
+ public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize,
boolean copyBinary) {
+ super(thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
}
@Override
public TTransport getTransport(TTransport trans) {
- return new TSnappyElasticFramedTransport(trans, thriftDefaultBufferSize,
thriftMaxFrameSize);
+ return new TSnappyElasticFramedTransport(
+ trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
}
}
public TSnappyElasticFramedTransport(TTransport underlying) {
- this(underlying, RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY,
RpcUtils.THRIFT_FRAME_MAX_SIZE);
+ this(underlying, RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY,
RpcUtils.THRIFT_FRAME_MAX_SIZE, true);
}
public TSnappyElasticFramedTransport(
- TTransport underlying, int thriftDefaultBufferSize, int
thriftMaxFrameSize) {
- super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize);
+ TTransport underlying,
+ int thriftDefaultBufferSize,
+ int thriftMaxFrameSize,
+ boolean copyBinary) {
+ super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
}
@Override
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
index 3c5f54be2ba..77c2bbf6b97 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
@@ -31,8 +31,8 @@ public class TimeoutChangeableTFastFramedTransport extends
TElasticFramedTranspo
private final TSocket underlyingSocket;
public TimeoutChangeableTFastFramedTransport(
- TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize)
{
- super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize);
+ TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize,
boolean copyBinary) {
+ super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
this.underlyingSocket = underlying;
}
@@ -56,19 +56,22 @@ public class TimeoutChangeableTFastFramedTransport extends
TElasticFramedTranspo
private final int thriftDefaultBufferSize;
protected final int thriftMaxFrameSize;
+ protected final boolean copyBinary;
- public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize) {
+ public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize,
boolean copyBinary) {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
+ this.copyBinary = copyBinary;
}
@Override
public TTransport getTransport(TTransport trans) {
if (trans instanceof TSocket) {
return new TimeoutChangeableTFastFramedTransport(
- (TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize);
+ (TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize,
copyBinary);
} else {
- return new TElasticFramedTransport(trans, thriftDefaultBufferSize,
thriftMaxFrameSize);
+ return new TElasticFramedTransport(
+ trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
}
}
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
index 590005b8b5e..168f52662aa 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
@@ -31,8 +31,8 @@ public class TimeoutChangeableTSnappyFramedTransport extends
TSnappyElasticFrame
private final TSocket underlyingSocket;
public TimeoutChangeableTSnappyFramedTransport(
- TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize)
{
- super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize);
+ TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize,
boolean copyBinary) {
+ super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
this.underlyingSocket = underlying;
}
@@ -56,20 +56,22 @@ public class TimeoutChangeableTSnappyFramedTransport
extends TSnappyElasticFrame
private final int thriftDefaultBufferSize;
protected final int thriftMaxFrameSize;
+ protected final boolean copyBinary;
- public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize) {
+ public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize,
boolean copyBinary) {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
+ this.copyBinary = copyBinary;
}
@Override
public TTransport getTransport(TTransport trans) {
if (trans instanceof TSocket) {
return new TimeoutChangeableTSnappyFramedTransport(
- (TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize);
+ (TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize,
copyBinary);
} else {
return new TSnappyElasticFramedTransport(
- trans, thriftDefaultBufferSize, thriftMaxFrameSize);
+ trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
}
}
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/ZeroCopyRpcTransportFactory.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/ZeroCopyRpcTransportFactory.java
new file mode 100644
index 00000000000..69ad7068cc5
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/ZeroCopyRpcTransportFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.thrift.transport.TTransportFactory;
+
+public class ZeroCopyRpcTransportFactory extends BaseRpcTransportFactory {
+
+ public static ZeroCopyRpcTransportFactory INSTANCE;
+
+ static {
+ reInit();
+ }
+
+ private ZeroCopyRpcTransportFactory(TTransportFactory inner) {
+ super(inner);
+ }
+
+ public static void reInit() {
+ INSTANCE =
+ USE_SNAPPY
+ ? new ZeroCopyRpcTransportFactory(
+ new TimeoutChangeableTSnappyFramedTransport.Factory(
+ thriftDefaultBufferSize, thriftMaxFrameSize, false))
+ : new ZeroCopyRpcTransportFactory(
+ new TElasticFramedTransport.Factory(
+ thriftDefaultBufferSize, thriftMaxFrameSize, false));
+ }
+}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 6cb618e8fa9..fb701a65277 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
-import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
@@ -151,12 +151,12 @@ public class SessionConnection {
private void init(TEndPoint endPoint, boolean useSSL, String trustStore,
String trustStorePwd)
throws IoTDBConnectionException {
-
RpcTransportFactory.setDefaultBufferCapacity(session.thriftDefaultBufferSize);
- RpcTransportFactory.setThriftMaxFrameSize(session.thriftMaxFrameSize);
+
DeepCopyRpcTransportFactory.setDefaultBufferCapacity(session.thriftDefaultBufferSize);
+
DeepCopyRpcTransportFactory.setThriftMaxFrameSize(session.thriftMaxFrameSize);
try {
if (useSSL) {
transport =
- RpcTransportFactory.INSTANCE.getTransport(
+ DeepCopyRpcTransportFactory.INSTANCE.getTransport(
endPoint.getIp(),
endPoint.getPort(),
session.connectionTimeoutInMs,
@@ -164,7 +164,7 @@ public class SessionConnection {
trustStorePwd);
} else {
transport =
- RpcTransportFactory.INSTANCE.getTransport(
+ DeepCopyRpcTransportFactory.INSTANCE.getTransport(
// as there is a try-catch already, we do not need to use
TSocket.wrap
endPoint.getIp(), endPoint.getPort(),
session.connectionTimeoutInMs);
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
index 36d2bc81213..5bfbe43ccfc 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.session;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
@@ -80,12 +80,12 @@ public class ThriftConnection {
ZoneId zoneId,
String version)
throws IoTDBConnectionException {
- RpcTransportFactory.setDefaultBufferCapacity(thriftDefaultBufferSize);
- RpcTransportFactory.setThriftMaxFrameSize(thriftMaxFrameSize);
+
DeepCopyRpcTransportFactory.setDefaultBufferCapacity(thriftDefaultBufferSize);
+ DeepCopyRpcTransportFactory.setThriftMaxFrameSize(thriftMaxFrameSize);
try {
if (useSSL) {
transport =
- RpcTransportFactory.INSTANCE.getTransport(
+ DeepCopyRpcTransportFactory.INSTANCE.getTransport(
endPoint.getIp(),
endPoint.getPort(),
connectionTimeoutInMs,
@@ -93,7 +93,7 @@ public class ThriftConnection {
trustStorePwd);
} else {
transport =
- RpcTransportFactory.INSTANCE.getTransport(
+ DeepCopyRpcTransportFactory.INSTANCE.getTransport(
// as there is a try-catch already, we do not need to use
TSocket.wrap
endPoint.getIp(), endPoint.getPort(), connectionTimeoutInMs);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
index b73ce6f8b08..77134444030 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode
*/
public class ConfigNodeRPCService extends ThriftService implements
ConfigNodeRPCServiceMBean {
@@ -69,7 +70,8 @@ public class ConfigNodeRPCService extends ThriftService
implements ConfigNodeRPC
configConf.getCnRpcMaxConcurrentClientNum(),
configConf.getThriftServerAwaitTimeForStopService(),
new ConfigNodeRPCServiceHandler(),
- commonConfig.isRpcThriftCompressionEnabled());
+ commonConfig.isRpcThriftCompressionEnabled(),
+ DeepCopyRpcTransportFactory.INSTANCE);
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
index e3815d3c3e5..da120fd56d7 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
@@ -26,7 +26,7 @@ import
org.apache.iotdb.commons.client.factory.ThriftClientFactory;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.commons.pool2.PooledObject;
@@ -50,7 +50,7 @@ public class SyncIoTConsensusServiceClient extends
IoTConsensusIService.Client
property
.getProtocolFactory()
.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
+ DeepCopyRpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
endpoint.getIp(),
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
index 567b23cefc6..7be6dbefc99 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.service.ThriftService;
import org.apache.iotdb.commons.service.ThriftServiceThread;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
+import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
import org.apache.thrift.TBaseAsyncProcessor;
import org.slf4j.Logger;
@@ -85,7 +86,8 @@ public class IoTConsensusRPCService extends ThriftService
implements IoTConsensu
config.getRpc().isRpcThriftCompressionEnabled(),
config.getRpc().getConnectionTimeoutInMs(),
config.getRpc().getThriftMaxFrameSize(),
- ThriftServiceThread.ServerType.SELECTOR);
+ ThriftServiceThread.ServerType.SELECTOR,
+ ZeroCopyRpcTransportFactory.INSTANCE);
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 682d3d00022..6be4d80e36d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -41,8 +41,9 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.TimeIndexLe
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm;
import org.apache.iotdb.metrics.metricsets.system.SystemMetrics;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.BaseRpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -2516,7 +2517,7 @@ public class IoTDBConfig {
public void setThriftMaxFrameSize(int thriftMaxFrameSize) {
this.thriftMaxFrameSize = thriftMaxFrameSize;
- RpcTransportFactory.setThriftMaxFrameSize(this.thriftMaxFrameSize);
+ BaseRpcTransportFactory.setThriftMaxFrameSize(this.thriftMaxFrameSize);
}
public int getThriftDefaultBufferSize() {
@@ -2525,7 +2526,7 @@ public class IoTDBConfig {
public void setThriftDefaultBufferSize(int thriftDefaultBufferSize) {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
- RpcTransportFactory.setDefaultBufferCapacity(this.thriftDefaultBufferSize);
+
BaseRpcTransportFactory.setDefaultBufferCapacity(this.thriftDefaultBufferSize);
}
public int getCheckPeriodWhenInsertBlocked() {
@@ -2606,7 +2607,7 @@ public class IoTDBConfig {
public void setRpcAdvancedCompressionEnable(boolean
rpcAdvancedCompressionEnable) {
this.rpcAdvancedCompressionEnable = rpcAdvancedCompressionEnable;
- RpcTransportFactory.setUseSnappy(this.rpcAdvancedCompressionEnable);
+
ZeroCopyRpcTransportFactory.setUseSnappy(this.rpcAdvancedCompressionEnable);
}
public int getMlogBufferSize() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ba8a5f41c40..3a3693e3b7b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -53,7 +53,8 @@ import
org.apache.iotdb.metrics.reporter.iotdb.IoTDBInternalMemoryReporter;
import org.apache.iotdb.metrics.reporter.iotdb.IoTDBInternalReporter;
import org.apache.iotdb.metrics.utils.InternalReporterType;
import org.apache.iotdb.metrics.utils.NodeType;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
+import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -975,7 +976,8 @@ public class IoTDBDescriptor {
loadTsFileProps(properties);
// make RPCTransportFactory taking effect.
- RpcTransportFactory.reInit();
+ ZeroCopyRpcTransportFactory.reInit();
+ DeepCopyRpcTransportFactory.reInit();
// UDF
loadUDFProps(properties);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
index fe0b2ac4073..30481ae9cf2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -180,7 +180,7 @@ public class IoTDBLegacyPipeReceiverAgent {
// step2. deserialize PipeData
PipeData pipeData;
try {
- int length = buff.capacity();
+ int length = buff.remaining();
byte[] byteArray = new byte[length];
buff.get(byteArray);
pipeData = PipeData.createPipeData(byteArray);
@@ -288,7 +288,7 @@ public class IoTDBLegacyPipeReceiverAgent {
// step3. append file
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"))
{
- int length = buff.capacity();
+ int length = buff.remaining();
randomAccessFile.seek(startIndex);
byte[] byteArray = new byte[length];
buff.get(byteArray);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index ddaf6dd75ff..49521a7132f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -129,7 +129,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.pool2.PooledObject;
@@ -195,7 +195,7 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
public void connect(TEndPoint endpoint) throws TException {
try {
transport =
- RpcTransportFactory.INSTANCE.getTransport(
+ DeepCopyRpcTransportFactory.INSTANCE.getTransport(
// As there is a try-catch already, we do not need to use
TSocket.wrap
endpoint.getIp(), endpoint.getPort(),
property.getConnectionTimeoutMs());
if (!transport.isOpen()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeService.java
index b42e521bfd8..51feec4b805 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeService.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService.Processor;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,7 +98,8 @@ public class MPPDataExchangeService extends ThriftService
implements MPPDataExch
config.getRpcMaxConcurrentClientNum(),
config.getThriftServerAwaitTimeForStopService(),
new MPPDataExchangeServiceThriftHandler(),
- config.isRpcThriftCompressionEnable());
+ config.isRpcThriftCompressionEnable(),
+ DeepCopyRpcTransportFactory.INSTANCE);
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
index 416dde14982..83e4869531f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.protocol.thrift.handler.InternalServiceThriftHandler;
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl;
import org.apache.iotdb.db.service.metrics.DataNodeInternalRPCServiceMetrics;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService.Processor;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
public class DataNodeInternalRPCService extends ThriftService
implements DataNodeInternalRPCServiceMBean {
@@ -67,7 +68,8 @@ public class DataNodeInternalRPCService extends ThriftService
config.getRpcMaxConcurrentClientNum(),
config.getThriftServerAwaitTimeForStopService(),
new InternalServiceThriftHandler(),
- config.isRpcThriftCompressionEnable());
+ config.isRpcThriftCompressionEnable(),
+ DeepCopyRpcTransportFactory.INSTANCE);
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java
index af747cb1dd0..0ddd8688637 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.protocol.thrift.ProcessorWithMetrics;
import org.apache.iotdb.db.protocol.thrift.handler.RPCServiceThriftHandler;
import org.apache.iotdb.db.protocol.thrift.impl.IClientRPCServiceWithHandler;
import org.apache.iotdb.db.service.metrics.RPCServiceMetrics;
+import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
import java.lang.reflect.InvocationTargetException;
@@ -73,7 +74,8 @@ public class RPCService extends ThriftService implements
RPCServiceMBean {
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable(),
config.getKeyStorePath(),
config.getKeyStorePwd(),
- config.getConnectionTimeoutInMS());
+ config.getConnectionTimeoutInMS(),
+ ZeroCopyRpcTransportFactory.INSTANCE);
} else {
thriftServiceThread =
new ThriftServiceThread(
@@ -85,9 +87,9 @@ public class RPCService extends ThriftService implements
RPCServiceMBean {
config.getRpcMaxConcurrentClientNum(),
config.getThriftServerAwaitTimeForStopService(),
new RPCServiceThriftHandler(impl),
-
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable(),
+ ZeroCopyRpcTransportFactory.INSTANCE);
}
-
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
index 4e1df10520a..99350ed32f7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TimeoutChangeableTransport;
@@ -52,7 +52,7 @@ public class SyncConfigNodeIServiceClient extends
IConfigNodeRPCService.Client
property
.getProtocolFactory()
.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
+ DeepCopyRpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
endPoint.getIp(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index 2b90b5b2661..b82968d4f7d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -26,7 +26,7 @@ import
org.apache.iotdb.commons.client.factory.ThriftClientFactory;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TimeoutChangeableTransport;
@@ -53,7 +53,7 @@ public class SyncDataNodeInternalServiceClient extends
IDataNodeRPCService.Clien
property
.getProtocolFactory()
.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
+ DeepCopyRpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
endpoint.getIp(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
index 024b7515fc8..592bb1ddccd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TimeoutChangeableTransport;
@@ -52,7 +52,7 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends
MPPDataExchangeSer
property
.getProtocolFactory()
.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
+ DeepCopyRpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
endpoint.getIp(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
index fd0fc10231f..8bbefd80798 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.commons.pipe.connector.client;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.iotdb.rpc.TimeoutChangeableTransport;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
@@ -44,13 +44,13 @@ public class IoTDBSyncClient extends
IClientRPCService.Client
.getProtocolFactory()
.getProtocol(
useSSL
- ? RpcTransportFactory.INSTANCE.getTransport(
+ ? DeepCopyRpcTransportFactory.INSTANCE.getTransport(
ipAddress,
port,
property.getConnectionTimeoutMs(),
trustStore,
trustStorePwd)
- : RpcTransportFactory.INSTANCE.getTransport(
+ : DeepCopyRpcTransportFactory.INSTANCE.getTransport(
ipAddress, port, property.getConnectionTimeoutMs())));
TTransport transport = getInputProtocol().getTransport();
if (!transport.isOpen()) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
index 12eea072f32..7cdd69d8a8c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
@@ -62,6 +62,8 @@ public abstract class AbstractThriftServiceThread extends
Thread {
private TProtocolFactory protocolFactory;
+ private TTransportFactory transportFactory;
+
// currently, we can reuse the ProtocolFactory instance.
private static TCompactProtocol.Factory compactProtocolFactory = new
TCompactProtocol.Factory();
private static TBinaryProtocol.Factory binaryProtocolFactory = new
TBinaryProtocol.Factory();
@@ -70,7 +72,9 @@ public abstract class AbstractThriftServiceThread extends
Thread {
protocolFactory = getProtocolFactory(compress);
}
- public abstract TTransportFactory getTTransportFactory();
+ public TTransportFactory getTTransportFactory() {
+ return transportFactory;
+ }
public static TProtocolFactory getProtocolFactory(boolean compress) {
if (compress) {
@@ -116,7 +120,9 @@ public abstract class AbstractThriftServiceThread extends
Thread {
boolean compress,
int connectionTimeoutInMS,
int maxReadBufferBytes,
- ServerType serverType) {
+ ServerType serverType,
+ TTransportFactory transportFactory) {
+ this.transportFactory = transportFactory;
initProtocolFactory(compress);
this.serviceName = serviceName;
try {
@@ -168,7 +174,9 @@ public abstract class AbstractThriftServiceThread extends
Thread {
boolean compress,
String keyStorePath,
String keyStorePwd,
- int clientTimeout) {
+ int clientTimeout,
+ TTransportFactory transportFactory) {
+ this.transportFactory = transportFactory;
initProtocolFactory(compress);
this.serviceName = serviceName;
@@ -200,7 +208,9 @@ public abstract class AbstractThriftServiceThread extends
Thread {
int maxWorkerThreads,
int timeoutSecond,
TServerEventHandler serverEventHandler,
- boolean compress) {
+ boolean compress,
+ TTransportFactory transportFactory) {
+ this.transportFactory = transportFactory;
initProtocolFactory(compress);
this.serviceName = serviceName;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
index ad5db5504e5..77f25ff6ab0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.commons.service;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-
import org.apache.thrift.TBaseAsyncProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.server.TServerEventHandler;
@@ -44,7 +42,8 @@ public class ThriftServiceThread extends
AbstractThriftServiceThread {
boolean compress,
int connectionTimeoutInMS,
int maxReadBufferBytes,
- ServerType serverType) {
+ ServerType serverType,
+ TTransportFactory transportFactory) {
super(
processor,
serviceName,
@@ -59,7 +58,8 @@ public class ThriftServiceThread extends
AbstractThriftServiceThread {
compress,
connectionTimeoutInMS,
maxReadBufferBytes,
- serverType);
+ serverType,
+ transportFactory);
}
/** for synced ThriftServiceThread */
@@ -76,7 +76,8 @@ public class ThriftServiceThread extends
AbstractThriftServiceThread {
boolean compress,
String keyStorePath,
String keyStorePwd,
- int clientTimeout) {
+ int clientTimeout,
+ TTransportFactory transportFactory) {
super(
processor,
serviceName,
@@ -89,7 +90,8 @@ public class ThriftServiceThread extends
AbstractThriftServiceThread {
compress,
keyStorePath,
keyStorePwd,
- clientTimeout);
+ clientTimeout,
+ transportFactory);
}
public ThriftServiceThread(
@@ -101,7 +103,8 @@ public class ThriftServiceThread extends
AbstractThriftServiceThread {
int maxWorkerThreads,
int timeoutSecond,
TServerEventHandler serverEventHandler,
- boolean compress) {
+ boolean compress,
+ TTransportFactory transportFactory) {
super(
processor,
serviceName,
@@ -111,11 +114,7 @@ public class ThriftServiceThread extends
AbstractThriftServiceThread {
maxWorkerThreads,
timeoutSecond,
serverEventHandler,
- compress);
- }
-
- @Override
- public TTransportFactory getTTransportFactory() {
- return RpcTransportFactory.INSTANCE;
+ compress,
+ transportFactory);
}
}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
index 771c6cf840a..386a212436f 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.service.ThriftService;
import org.apache.iotdb.commons.service.ThriftServiceThread;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.thrift.server.TServerEventHandler;
@@ -69,7 +70,8 @@ public class MockInternalRPCService extends ThriftService
implements MockInterna
65535,
60,
mock(TServerEventHandler.class),
- false);
+ false,
+ DeepCopyRpcTransportFactory.INSTANCE);
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}