This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 41ee831b7e2 Pipe: Avoid req size excceeding the thrift max frame size
by slicing the original req automatically (#13290)
41ee831b7e2 is described below
commit 41ee831b7e24a46549cd6946e5b677f5dd39edf2
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Aug 26 17:29:43 2024 +0800
Pipe: Avoid req size excceeding the thrift max frame size by slicing the
original req automatically (#13290)
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../pipe/metric/PipeDataNodeReceiverMetrics.java | 63 ++++++
.../protocol/thrift/IoTDBDataNodeReceiver.java | 237 +++++++++++++++------
.../apache/iotdb/commons/conf/CommonConfig.java | 13 ++
.../iotdb/commons/conf/CommonDescriptor.java | 6 +
.../iotdb/commons/pipe/config/PipeConfig.java | 8 +
.../pipe/connector/client/IoTDBSyncClient.java | 76 +++++++
.../thrift/common/PipeTransferSliceReqHandler.java | 135 ++++++++++++
.../payload/thrift/request/PipeRequestType.java | 3 +
.../thrift/request/PipeTransferSliceReq.java | 171 +++++++++++++++
10 files changed, 643 insertions(+), 70 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 1009f8cd837..4964ff4c06b 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -255,6 +255,7 @@ public enum TSStatusCode {
PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION(1809),
PIPE_RECEIVER_USER_CONFLICT_EXCEPTION(1810),
PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED(1811),
+ PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812),
// Subscription
SUBSCRIPTION_VERSION_ERROR(1900),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
index eddad988041..8405e48085a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
@@ -45,6 +45,9 @@ public class PipeDataNodeReceiverMetrics implements
IMetricSet {
private Timer transferSchemaPlanTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer transferSchemaSnapshotPieceTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer transferSchemaSnapshotSealTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferConfigPlanTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferCompressedTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private static final String RECEIVER = "pipeDataNodeReceiver";
@@ -102,6 +105,18 @@ public class PipeDataNodeReceiverMetrics implements
IMetricSet {
transferSchemaSnapshotSealTimer.updateNanos(costTimeInNanos);
}
+ public void recordTransferConfigPlanTimer(long costTimeInNanos) {
+ transferConfigPlanTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferCompressedTimer(long costTimeInNanos) {
+ transferCompressedTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferSliceTimer(long costTimeInNanos) {
+ transferSliceTimer.updateNanos(costTimeInNanos);
+ }
+
@Override
public void bindTo(AbstractMetricService metricService) {
bindToTimer(metricService);
@@ -212,6 +227,30 @@ public class PipeDataNodeReceiverMetrics implements
IMetricSet {
RECEIVER,
Tag.TYPE.toString(),
"transferSchemaSnapshotSeal");
+ transferConfigPlanTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferConfigPlan");
+ transferCompressedTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferCompressed");
+ transferSliceTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferSlice");
}
@Override
@@ -233,6 +272,9 @@ public class PipeDataNodeReceiverMetrics implements
IMetricSet {
transferSchemaPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
transferSchemaSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
transferSchemaSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
metricService.remove(
MetricType.TIMER,
@@ -325,6 +367,27 @@ public class PipeDataNodeReceiverMetrics implements
IMetricSet {
RECEIVER,
Tag.TYPE.toString(),
"transferSchemaSnapshotSeal");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferConfigPlan");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferCompressed");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferSlice");
}
public static PipeDataNodeReceiverMetrics getInstance() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 40f5065caec..9d91706cdbf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -24,10 +24,12 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferSliceReqHandler;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq;
import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.utils.FileUtils;
@@ -133,6 +135,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private static final AtomicLong CONFIG_RECEIVER_ID_GENERATOR = new
AtomicLong(0);
protected final AtomicReference<String> configReceiverId = new
AtomicReference<>();
+ private final PipeTransferSliceReqHandler sliceReqHandler = new
PipeTransferSliceReqHandler();
+
static {
try {
folderManager =
@@ -148,111 +152,185 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
@Override
public synchronized TPipeTransferResp receive(final TPipeTransferReq req) {
try {
- long startTime = System.nanoTime();
+ final long startTime = System.nanoTime();
final short rawRequestType = req.getType();
if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
- TPipeTransferResp resp;
- switch (PipeRequestType.valueOf(rawRequestType)) {
+ final PipeRequestType requestType =
PipeRequestType.valueOf(rawRequestType);
+ if (requestType != PipeRequestType.TRANSFER_SLICE) {
+ sliceReqHandler.clear();
+ }
+ switch (requestType) {
case HANDSHAKE_DATANODE_V1:
- resp =
- handleTransferHandshakeV1(
+ {
+ try {
+ return handleTransferHandshakeV1(
PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req));
- PipeDataNodeReceiverMetrics.getInstance()
- .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime);
- return resp;
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordHandshakeDatanodeV1Timer(System.nanoTime() -
startTime);
+ }
+ }
case HANDSHAKE_DATANODE_V2:
- resp =
- handleTransferHandshakeV2(
+ {
+ try {
+ return handleTransferHandshakeV2(
PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req));
- PipeDataNodeReceiverMetrics.getInstance()
- .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime);
- return resp;
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordHandshakeDatanodeV2Timer(System.nanoTime() -
startTime);
+ }
+ }
case TRANSFER_TABLET_INSERT_NODE:
- resp =
- handleTransferTabletInsertNode(
+ {
+ try {
+ return handleTransferTabletInsertNode(
PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req));
- PipeDataNodeReceiverMetrics.getInstance()
- .recordTransferTabletInsertNodeTimer(System.nanoTime() -
startTime);
- return resp;
+
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTabletInsertNodeTimer(System.nanoTime() -
startTime);
+ }
+ }
case TRANSFER_TABLET_RAW:
- resp =
handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
- PipeDataNodeReceiverMetrics.getInstance()
- .recordTransferTabletRawTimer(System.nanoTime() - startTime);
- return resp;
+ {
+ try {
+ return
handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTabletRawTimer(System.nanoTime() -
startTime);
+ }
+ }
case TRANSFER_TABLET_BINARY:
- resp =
-
handleTransferTabletBinary(PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
- PipeDataNodeReceiverMetrics.getInstance()
- .recordTransferTabletBinaryTimer(System.nanoTime() -
startTime);
- return resp;
+ {
+ try {
+ return handleTransferTabletBinary(
+ PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTabletBinaryTimer(System.nanoTime() -
startTime);
+ }
+ }
case TRANSFER_TABLET_BATCH:
- resp =
handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
- PipeDataNodeReceiverMetrics.getInstance()
- .recordTransferTabletBatchTimer(System.nanoTime() - startTime);
- return resp;
+ {
+ try {
+ return handleTransferTabletBatch(
+ PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTabletBatchTimer(System.nanoTime() -
startTime);
+ }
+ }
case TRANSFER_TS_FILE_PIECE:
- resp =
- handleTransferFilePiece(
+ {
+ try {
+ return handleTransferFilePiece(
PipeTransferTsFilePieceReq.fromTPipeTransferReq(req),
req instanceof AirGapPseudoTPipeTransferRequest,
true);
- PipeDataNodeReceiverMetrics.getInstance()
- .recordTransferTsFilePieceTimer(System.nanoTime() - startTime);
- return resp;
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTsFilePieceTimer(System.nanoTime() -
startTime);
+ }
+ }
case TRANSFER_TS_FILE_SEAL:
- resp =
handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
- PipeDataNodeReceiverMetrics.getInstance()
- .recordTransferTsFileSealTimer(System.nanoTime() - startTime);
- return resp;
+ {
+ try {
+ return handleTransferFileSealV1(
+ PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTsFileSealTimer(System.nanoTime() -
startTime);
+ }
+ }
case TRANSFER_TS_FILE_PIECE_WITH_MOD:
- resp =
- handleTransferFilePiece(
+ {
+ try {
+ return handleTransferFilePiece(
PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req),
req instanceof AirGapPseudoTPipeTransferRequest,
false);
- PipeDataNodeReceiverMetrics.getInstance()
- .recordTransferTsFilePieceWithModTimer(System.nanoTime() -
startTime);
- return resp;
+
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTsFilePieceWithModTimer(System.nanoTime() -
startTime);
+ }
+ }
case TRANSFER_TS_FILE_SEAL_WITH_MOD:
- resp =
- handleTransferFileSealV2(
+ {
+ try {
+ return handleTransferFileSealV2(
PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req));
- PipeDataNodeReceiverMetrics.getInstance()
- .recordTransferTsFileSealWithModTimer(System.nanoTime() -
startTime);
- return resp;
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTsFileSealWithModTimer(System.nanoTime() -
startTime);
+ }
+ }
case TRANSFER_SCHEMA_PLAN:
- resp =
handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
- PipeDataNodeReceiverMetrics.getInstance()
- .recordTransferSchemaPlanTimer(System.nanoTime() - startTime);
- return resp;
+ {
+ try {
+ return
handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferSchemaPlanTimer(System.nanoTime() -
startTime);
+ }
+ }
case TRANSFER_SCHEMA_SNAPSHOT_PIECE:
- resp =
- handleTransferFilePiece(
+ {
+ try {
+ return handleTransferFilePiece(
PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req),
req instanceof AirGapPseudoTPipeTransferRequest,
false);
- PipeDataNodeReceiverMetrics.getInstance()
- .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() -
startTime);
- return resp;
+
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferSchemaSnapshotPieceTimer(System.nanoTime()
- startTime);
+ }
+ }
case TRANSFER_SCHEMA_SNAPSHOT_SEAL:
- resp =
- handleTransferFileSealV2(
+ {
+ try {
+ return handleTransferFileSealV2(
PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req));
- PipeDataNodeReceiverMetrics.getInstance()
- .recordTransferSchemaSnapshotSealTimer(System.nanoTime() -
startTime);
- return resp;
+
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferSchemaSnapshotSealTimer(System.nanoTime() -
startTime);
+ }
+ }
case HANDSHAKE_CONFIGNODE_V1:
case HANDSHAKE_CONFIGNODE_V2:
case TRANSFER_CONFIG_PLAN:
case TRANSFER_CONFIG_SNAPSHOT_PIECE:
case TRANSFER_CONFIG_SNAPSHOT_SEAL:
- // Config requests will first be received by the DataNode receiver,
- // then transferred to ConfigNode receiver to execute.
- resp = handleTransferConfigPlan(req);
- return resp;
+ {
+ try {
+ // Config requests will first be received by the DataNode
receiver,
+ // then transferred to ConfigNode receiver to execute.
+ return handleTransferConfigPlan(req);
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferConfigPlanTimer(System.nanoTime() -
startTime);
+ }
+ }
+ case TRANSFER_SLICE:
+ {
+ try {
+ return
handleTransferSlice(PipeTransferSliceReq.fromTPipeTransferReq(req));
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferSliceTimer(System.nanoTime() - startTime);
+ }
+ }
case TRANSFER_COMPRESSED:
- resp =
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
- return resp;
+ {
+ try {
+ return
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
+ } finally {
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferCompressedTimer(System.nanoTime() -
startTime);
+ }
+ }
default:
break;
}
@@ -442,6 +520,25 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return configReceiverId.get();
}
+ private TPipeTransferResp handleTransferSlice(final PipeTransferSliceReq
pipeTransferSliceReq) {
+ final boolean isInorder =
sliceReqHandler.receiveSlice(pipeTransferSliceReq);
+ if (!isInorder) {
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER,
+ "Slice request is out of order, please check the request
sequence."));
+ }
+ final Optional<TPipeTransferReq> req = sliceReqHandler.makeReqIfComplete();
+ if (!req.isPresent()) {
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.SUCCESS_STATUS,
+ "Slice received, waiting for more slices to complete the
request."));
+ }
+ // sliceReqHandler will be cleared in the receive(req) method
+ return receive(req.get());
+ }
+
/**
* For {@link InsertRowsStatement} and {@link InsertMultiTabletsStatement},
the returned {@link
* TSStatus} will use sub-status to record the endpoint for redirection.
Each sub-status records
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3aa185b4d3d..cc80dcab4ef 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.tsfile.fileSystem.FSType;
import org.slf4j.Logger;
@@ -224,6 +225,9 @@ public class CommonConfig {
private double pipeAllSinksRateLimitBytesPerSecond = -1;
private int rateLimiterHotReloadCheckIntervalMs = 1000;
+ private int pipeConnectorRequestSliceThresholdBytes =
+ (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
+
private boolean isSeperatedPipeHeartbeatEnabled = true;
private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
@@ -1124,6 +1128,15 @@ public class CommonConfig {
this.rateLimiterHotReloadCheckIntervalMs =
rateLimiterHotReloadCheckIntervalMs;
}
+ public int getPipeConnectorRequestSliceThresholdBytes() {
+ return pipeConnectorRequestSliceThresholdBytes;
+ }
+
+ public void setPipeConnectorRequestSliceThresholdBytes(
+ int pipeConnectorRequestSliceThresholdBytes) {
+ this.pipeConnectorRequestSliceThresholdBytes =
pipeConnectorRequestSliceThresholdBytes;
+ }
+
public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
return twoStageAggregateMaxCombinerLiveTimeInMs;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 36003ec30e1..c41c932c395 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -409,6 +409,12 @@ public class CommonDescriptor {
"rate_limiter_hot_reload_check_interval_ms",
String.valueOf(config.getRateLimiterHotReloadCheckIntervalMs()))));
+ config.setPipeConnectorRequestSliceThresholdBytes(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_connector_request_slice_threshold_bytes",
+
String.valueOf(config.getPipeConnectorRequestSliceThresholdBytes()))));
+
config.setSeperatedPipeHeartbeatEnabled(
Boolean.parseBoolean(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index b215c7a3200..fc9fb0911a0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -144,6 +144,10 @@ public class PipeConfig {
return COMMON_CONFIG.getRateLimiterHotReloadCheckIntervalMs();
}
+ public int getPipeConnectorRequestSliceThresholdBytes() {
+ return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes();
+ }
+
public float getPipeLeaderCacheMemoryUsagePercentage() {
return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
}
@@ -375,6 +379,10 @@ public class PipeConfig {
LOGGER.info(
"RateLimiterHotReloadCheckIntervalMs: {}",
getRateLimiterHotReloadCheckIntervalMs());
+ LOGGER.info(
+ "PipeConnectorRequestSliceThresholdBytes: {}",
+ getPipeConnectorRequestSliceThresholdBytes());
+
LOGGER.info("SeperatedPipeHeartbeatEnabled: {}",
isSeperatedPipeHeartbeatEnabled());
LOGGER.info(
"PipeHeartbeatIntervalSecondsForCollectingPipeMeta: {}",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
index f15934afd5e..d79c430474c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
@@ -22,16 +22,32 @@ package org.apache.iotdb.commons.pipe.connector.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.TimeoutChangeableTransport;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
public class IoTDBSyncClient extends IClientRPCService.Client
implements ThriftClient, AutoCloseable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSyncClient.class);
+
+ private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new
AtomicInteger(0);
+
private final String ipAddress;
private final int port;
private final TEndPoint endPoint;
@@ -82,6 +98,66 @@ public class IoTDBSyncClient extends IClientRPCService.Client
((TimeoutChangeableTransport)
(getInputProtocol().getTransport())).setTimeout(timeout);
}
+ @Override
+ public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws
TException {
+ final int bodySizeLimit =
PipeConfig.getInstance().getPipeConnectorRequestSliceThresholdBytes();
+ if (req.getVersion() != IoTDBConnectorRequestVersion.VERSION_1.getVersion()
+ || req.body.limit() < bodySizeLimit) {
+ return super.pipeTransfer(req);
+ }
+
+ LOGGER.warn(
+ "The body size of the request is too large. The request will be
sliced. Origin req: {}-{}. "
+ + "Request body size: {}, threshold: {}",
+ req.getVersion(),
+ req.getType(),
+ req.body.limit(),
+ bodySizeLimit);
+
+ try {
+ final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement();
+ // Slice the buffer to avoid the buffer being too large
+ final int sliceCount =
+ req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit
== 0 ? 0 : 1);
+ for (int i = 0; i < sliceCount; ++i) {
+ final int startIndexInBody = i * bodySizeLimit;
+ final int endIndexInBody = Math.min((i + 1) * bodySizeLimit,
req.body.limit());
+ final TPipeTransferResp sliceResp =
+ super.pipeTransfer(
+ PipeTransferSliceReq.toTPipeTransferReq(
+ sliceOrderId,
+ req.getType(),
+ i,
+ sliceCount,
+ req.body.duplicate(),
+ startIndexInBody,
+ endIndexInBody));
+
+ if (i == sliceCount - 1) {
+ return sliceResp;
+ }
+
+ if (sliceResp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeConnectionException(
+ String.format(
+ "Failed to transfer slice. Origin req: %s-%s, slice index:
%d, slice count: %d. Reason: %s",
+ req.getVersion(), req.getType(), i, sliceCount,
sliceResp.getStatus()));
+ }
+ }
+
+ // Should not reach here
+ return super.pipeTransfer(req);
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to transfer slice. Origin req: {}-{}. Retry the whole
transfer.",
+ req.getVersion(),
+ req.getType(),
+ e);
+ // Fall back to the original behavior
+ return super.pipeTransfer(req);
+ }
+ }
+
@Override
public void close() throws Exception {
invalidate();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
new file mode 100644
index 00000000000..b3bd16acc7d
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.payload.thrift.common;
+
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class PipeTransferSliceReqHandler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferSliceReqHandler.class);
+
+ private int orderId = -1;
+
+ private short originReqType = -1;
+ private int originBodySize = -1;
+
+ private int sliceCount = -1;
+ private final List<byte[]> sliceBodies = new ArrayList<>();
+
+ public boolean receiveSlice(final PipeTransferSliceReq req) {
+ if (orderId == -1
+ || originReqType == -1
+ || originBodySize == -1
+ || sliceCount == -1
+ || sliceBodies.isEmpty()) {
+ if (orderId == -1
+ && originReqType == -1
+ && originBodySize == -1
+ && sliceCount == -1
+ && sliceBodies.isEmpty()) {
+ orderId = req.getOrderId();
+ originReqType = req.getOriginReqType();
+ originBodySize = req.getOriginBodySize();
+ sliceCount = req.getSliceCount();
+ } else {
+ LOGGER.warn(
+ "Invalid state: orderId={}, originReqType={}, originBodySize={},
sliceCount={}, sliceBodies.size={}",
+ orderId,
+ originReqType,
+ originBodySize,
+ sliceCount,
+ sliceBodies.size());
+ clear();
+ return false;
+ }
+ }
+
+ if (orderId != req.getOrderId()) {
+ LOGGER.warn("Order ID mismatch: expected {}, actual {}", orderId,
req.getOrderId());
+ clear();
+ return false;
+ }
+ if (originReqType != req.getOriginReqType()) {
+ LOGGER.warn(
+ "Origin request type mismatch: expected {}, actual {}",
+ originReqType,
+ req.getOriginReqType());
+ clear();
+ return false;
+ }
+ if (originBodySize != req.getOriginBodySize()) {
+ LOGGER.warn(
+ "Origin body size mismatch: expected {}, actual {}",
+ originBodySize,
+ req.getOriginBodySize());
+ clear();
+ return false;
+ }
+ if (sliceCount != req.getSliceCount()) {
+ LOGGER.warn("Slice count mismatch: expected {}, actual {}", sliceCount,
req.getSliceCount());
+ clear();
+ return false;
+ }
+ if (sliceBodies.size() != req.getSliceIndex()) {
+ LOGGER.warn(
+ "Invalid slice index: expected {}, actual {}", sliceBodies.size(),
req.getSliceIndex());
+ clear();
+ return false;
+ }
+
+ sliceBodies.add(req.getSliceBody());
+ return true;
+ }
+
+ public Optional<TPipeTransferReq> makeReqIfComplete() {
+ if (sliceBodies.size() != sliceCount) {
+ return Optional.empty();
+ }
+
+ final TPipeTransferReq req = new TPipeTransferReq();
+ req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+ req.type = originReqType;
+
+ final ByteBuffer body = ByteBuffer.allocate(originBodySize);
+ sliceBodies.forEach(body::put);
+ body.flip();
+ req.body = body;
+
+ return Optional.of(req);
+ }
+
+ public void clear() {
+ orderId = -1;
+ originReqType = -1;
+ originBodySize = -1;
+ sliceCount = -1;
+ sliceBodies.clear();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
index 003c8b9afb3..0f9b6f4dafb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
@@ -53,6 +53,9 @@ public enum PipeRequestType {
// RPC Compression
TRANSFER_COMPRESSED((short) 300),
+
+ // Fallback Handling
+ TRANSFER_SLICE((short) 400),
;
private final short type;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
new file mode 100644
index 00000000000..4df6008400d
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class PipeTransferSliceReq extends TPipeTransferReq {
+
+ private transient int orderId;
+
+ private transient short originReqType;
+ private transient int originBodySize;
+
+ private transient byte[] sliceBody;
+
+ private transient int sliceIndex;
+ private transient int sliceCount;
+
+ public int getOrderId() {
+ return orderId;
+ }
+
+ public short getOriginReqType() {
+ return originReqType;
+ }
+
+ public int getOriginBodySize() {
+ return originBodySize;
+ }
+
+ public byte[] getSliceBody() {
+ return sliceBody;
+ }
+
+ public int getSliceIndex() {
+ return sliceIndex;
+ }
+
+ public int getSliceCount() {
+ return sliceCount;
+ }
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ public static PipeTransferSliceReq toTPipeTransferReq(
+ final int orderId,
+ final short originReqType,
+ final int sliceIndex,
+ final int sliceCount,
+ final ByteBuffer duplicatedOriginBody,
+ final int startIndexInBody,
+ final int endIndexInBody)
+ throws IOException {
+ final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq();
+
+ sliceReq.orderId = orderId;
+
+ sliceReq.originReqType = originReqType;
+ sliceReq.originBodySize = duplicatedOriginBody.limit();
+
+ sliceReq.sliceBody = new byte[endIndexInBody - startIndexInBody];
+ duplicatedOriginBody.position(startIndexInBody);
+ duplicatedOriginBody.get(sliceReq.sliceBody);
+
+ sliceReq.sliceIndex = sliceIndex;
+ sliceReq.sliceCount = sliceCount;
+
+ sliceReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+ sliceReq.type = PipeRequestType.TRANSFER_SLICE.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(sliceReq.orderId, outputStream);
+
+ ReadWriteIOUtils.write(sliceReq.originReqType, outputStream);
+ ReadWriteIOUtils.write(sliceReq.originBodySize, outputStream);
+
+ ReadWriteIOUtils.write(new Binary(sliceReq.sliceBody), outputStream);
+
+ ReadWriteIOUtils.write(sliceReq.sliceIndex, outputStream);
+ ReadWriteIOUtils.write(sliceReq.sliceCount, outputStream);
+
+ sliceReq.body =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ return sliceReq;
+ }
+
+ public static PipeTransferSliceReq fromTPipeTransferReq(final
TPipeTransferReq transferReq) {
+ final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq();
+
+ sliceReq.orderId = ReadWriteIOUtils.readInt(transferReq.body);
+
+ sliceReq.originReqType = ReadWriteIOUtils.readShort(transferReq.body);
+ sliceReq.originBodySize = ReadWriteIOUtils.readInt(transferReq.body);
+
+ sliceReq.sliceBody =
ReadWriteIOUtils.readBinary(transferReq.body).getValues();
+
+ sliceReq.sliceIndex = ReadWriteIOUtils.readInt(transferReq.body);
+ sliceReq.sliceCount = ReadWriteIOUtils.readInt(transferReq.body);
+
+ sliceReq.version = transferReq.version;
+ sliceReq.type = transferReq.type;
+ sliceReq.body = transferReq.body;
+
+ return sliceReq;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final PipeTransferSliceReq that = (PipeTransferSliceReq) obj;
+ return Objects.equals(orderId, that.orderId)
+ && Objects.equals(originReqType, that.originReqType)
+ && Objects.equals(originBodySize, that.originBodySize)
+ && Arrays.equals(sliceBody, that.sliceBody)
+ && Objects.equals(sliceIndex, that.sliceIndex)
+ && Objects.equals(sliceCount, that.sliceCount)
+ && Objects.equals(version, that.version)
+ && Objects.equals(type, that.type)
+ && Objects.equals(body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ orderId,
+ originReqType,
+ originBodySize,
+ Arrays.hashCode(sliceBody),
+ sliceIndex,
+ sliceCount,
+ version,
+ type,
+ body);
+ }
+}