This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 41ee831b7e2 Pipe: Avoid req size excceeding the thrift max frame size 
by slicing the original req automatically (#13290)
41ee831b7e2 is described below

commit 41ee831b7e24a46549cd6946e5b677f5dd39edf2
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Aug 26 17:29:43 2024 +0800

    Pipe: Avoid req size excceeding the thrift max frame size by slicing the 
original req automatically (#13290)
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../pipe/metric/PipeDataNodeReceiverMetrics.java   |  63 ++++++
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 237 +++++++++++++++------
 .../apache/iotdb/commons/conf/CommonConfig.java    |  13 ++
 .../iotdb/commons/conf/CommonDescriptor.java       |   6 +
 .../iotdb/commons/pipe/config/PipeConfig.java      |   8 +
 .../pipe/connector/client/IoTDBSyncClient.java     |  76 +++++++
 .../thrift/common/PipeTransferSliceReqHandler.java | 135 ++++++++++++
 .../payload/thrift/request/PipeRequestType.java    |   3 +
 .../thrift/request/PipeTransferSliceReq.java       | 171 +++++++++++++++
 10 files changed, 643 insertions(+), 70 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 1009f8cd837..4964ff4c06b 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -255,6 +255,7 @@ public enum TSStatusCode {
   PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION(1809),
   PIPE_RECEIVER_USER_CONFLICT_EXCEPTION(1810),
   PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED(1811),
+  PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812),
 
   // Subscription
   SUBSCRIPTION_VERSION_ERROR(1900),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
index eddad988041..8405e48085a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
@@ -45,6 +45,9 @@ public class PipeDataNodeReceiverMetrics implements 
IMetricSet {
   private Timer transferSchemaPlanTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer transferSchemaSnapshotPieceTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer transferSchemaSnapshotSealTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferConfigPlanTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferCompressedTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
 
   private static final String RECEIVER = "pipeDataNodeReceiver";
 
@@ -102,6 +105,18 @@ public class PipeDataNodeReceiverMetrics implements 
IMetricSet {
     transferSchemaSnapshotSealTimer.updateNanos(costTimeInNanos);
   }
 
+  public void recordTransferConfigPlanTimer(long costTimeInNanos) {
+    transferConfigPlanTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferCompressedTimer(long costTimeInNanos) {
+    transferCompressedTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferSliceTimer(long costTimeInNanos) {
+    transferSliceTimer.updateNanos(costTimeInNanos);
+  }
+
   @Override
   public void bindTo(AbstractMetricService metricService) {
     bindToTimer(metricService);
@@ -212,6 +227,30 @@ public class PipeDataNodeReceiverMetrics implements 
IMetricSet {
             RECEIVER,
             Tag.TYPE.toString(),
             "transferSchemaSnapshotSeal");
+    transferConfigPlanTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferConfigPlan");
+    transferCompressedTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferCompressed");
+    transferSliceTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferSlice");
   }
 
   @Override
@@ -233,6 +272,9 @@ public class PipeDataNodeReceiverMetrics implements 
IMetricSet {
     transferSchemaPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
     transferSchemaSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
     transferSchemaSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
 
     metricService.remove(
         MetricType.TIMER,
@@ -325,6 +367,27 @@ public class PipeDataNodeReceiverMetrics implements 
IMetricSet {
         RECEIVER,
         Tag.TYPE.toString(),
         "transferSchemaSnapshotSeal");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferConfigPlan");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferCompressed");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferSlice");
   }
 
   public static PipeDataNodeReceiverMetrics getInstance() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 40f5065caec..9d91706cdbf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -24,10 +24,12 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferSliceReqHandler;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq;
 import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
 import org.apache.iotdb.commons.utils.FileUtils;
@@ -133,6 +135,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private static final AtomicLong CONFIG_RECEIVER_ID_GENERATOR = new 
AtomicLong(0);
   protected final AtomicReference<String> configReceiverId = new 
AtomicReference<>();
 
+  private final PipeTransferSliceReqHandler sliceReqHandler = new 
PipeTransferSliceReqHandler();
+
   static {
     try {
       folderManager =
@@ -148,111 +152,185 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   @Override
   public synchronized TPipeTransferResp receive(final TPipeTransferReq req) {
     try {
-      long startTime = System.nanoTime();
+      final long startTime = System.nanoTime();
       final short rawRequestType = req.getType();
       if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
-        TPipeTransferResp resp;
-        switch (PipeRequestType.valueOf(rawRequestType)) {
+        final PipeRequestType requestType = 
PipeRequestType.valueOf(rawRequestType);
+        if (requestType != PipeRequestType.TRANSFER_SLICE) {
+          sliceReqHandler.clear();
+        }
+        switch (requestType) {
           case HANDSHAKE_DATANODE_V1:
-            resp =
-                handleTransferHandshakeV1(
+            {
+              try {
+                return handleTransferHandshakeV1(
                     
PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime);
-            return resp;
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordHandshakeDatanodeV1Timer(System.nanoTime() - 
startTime);
+              }
+            }
           case HANDSHAKE_DATANODE_V2:
-            resp =
-                handleTransferHandshakeV2(
+            {
+              try {
+                return handleTransferHandshakeV2(
                     
PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime);
-            return resp;
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordHandshakeDatanodeV2Timer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TABLET_INSERT_NODE:
-            resp =
-                handleTransferTabletInsertNode(
+            {
+              try {
+                return handleTransferTabletInsertNode(
                     PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTabletInsertNodeTimer(System.nanoTime() - 
startTime);
-            return resp;
+
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTabletInsertNodeTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TABLET_RAW:
-            resp = 
handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTabletRawTimer(System.nanoTime() - startTime);
-            return resp;
+            {
+              try {
+                return 
handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTabletRawTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TABLET_BINARY:
-            resp =
-                
handleTransferTabletBinary(PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTabletBinaryTimer(System.nanoTime() - 
startTime);
-            return resp;
+            {
+              try {
+                return handleTransferTabletBinary(
+                    PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTabletBinaryTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TABLET_BATCH:
-            resp = 
handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTabletBatchTimer(System.nanoTime() - startTime);
-            return resp;
+            {
+              try {
+                return handleTransferTabletBatch(
+                    PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTabletBatchTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TS_FILE_PIECE:
-            resp =
-                handleTransferFilePiece(
+            {
+              try {
+                return handleTransferFilePiece(
                     PipeTransferTsFilePieceReq.fromTPipeTransferReq(req),
                     req instanceof AirGapPseudoTPipeTransferRequest,
                     true);
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTsFilePieceTimer(System.nanoTime() - startTime);
-            return resp;
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTsFilePieceTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TS_FILE_SEAL:
-            resp = 
handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTsFileSealTimer(System.nanoTime() - startTime);
-            return resp;
+            {
+              try {
+                return handleTransferFileSealV1(
+                    PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTsFileSealTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TS_FILE_PIECE_WITH_MOD:
-            resp =
-                handleTransferFilePiece(
+            {
+              try {
+                return handleTransferFilePiece(
                     
PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req),
                     req instanceof AirGapPseudoTPipeTransferRequest,
                     false);
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTsFilePieceWithModTimer(System.nanoTime() - 
startTime);
-            return resp;
+
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTsFilePieceWithModTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TS_FILE_SEAL_WITH_MOD:
-            resp =
-                handleTransferFileSealV2(
+            {
+              try {
+                return handleTransferFileSealV2(
                     
PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTsFileSealWithModTimer(System.nanoTime() - 
startTime);
-            return resp;
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTsFileSealWithModTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_SCHEMA_PLAN:
-            resp = 
handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferSchemaPlanTimer(System.nanoTime() - startTime);
-            return resp;
+            {
+              try {
+                return 
handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferSchemaPlanTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_SCHEMA_SNAPSHOT_PIECE:
-            resp =
-                handleTransferFilePiece(
+            {
+              try {
+                return handleTransferFilePiece(
                     
PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req),
                     req instanceof AirGapPseudoTPipeTransferRequest,
                     false);
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - 
startTime);
-            return resp;
+
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() 
- startTime);
+              }
+            }
           case TRANSFER_SCHEMA_SNAPSHOT_SEAL:
-            resp =
-                handleTransferFileSealV2(
+            {
+              try {
+                return handleTransferFileSealV2(
                     
PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - 
startTime);
-            return resp;
+
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case HANDSHAKE_CONFIGNODE_V1:
           case HANDSHAKE_CONFIGNODE_V2:
           case TRANSFER_CONFIG_PLAN:
           case TRANSFER_CONFIG_SNAPSHOT_PIECE:
           case TRANSFER_CONFIG_SNAPSHOT_SEAL:
-            // Config requests will first be received by the DataNode receiver,
-            // then transferred to ConfigNode receiver to execute.
-            resp = handleTransferConfigPlan(req);
-            return resp;
+            {
+              try {
+                // Config requests will first be received by the DataNode 
receiver,
+                // then transferred to ConfigNode receiver to execute.
+                return handleTransferConfigPlan(req);
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferConfigPlanTimer(System.nanoTime() - 
startTime);
+              }
+            }
+          case TRANSFER_SLICE:
+            {
+              try {
+                return 
handleTransferSlice(PipeTransferSliceReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferSliceTimer(System.nanoTime() - startTime);
+              }
+            }
           case TRANSFER_COMPRESSED:
-            resp = 
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
-            return resp;
+            {
+              try {
+                return 
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferCompressedTimer(System.nanoTime() - 
startTime);
+              }
+            }
           default:
             break;
         }
@@ -442,6 +520,25 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     return configReceiverId.get();
   }
 
+  private TPipeTransferResp handleTransferSlice(final PipeTransferSliceReq 
pipeTransferSliceReq) {
+    final boolean isInorder = 
sliceReqHandler.receiveSlice(pipeTransferSliceReq);
+    if (!isInorder) {
+      return new TPipeTransferResp(
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER,
+              "Slice request is out of order, please check the request 
sequence."));
+    }
+    final Optional<TPipeTransferReq> req = sliceReqHandler.makeReqIfComplete();
+    if (!req.isPresent()) {
+      return new TPipeTransferResp(
+          RpcUtils.getStatus(
+              TSStatusCode.SUCCESS_STATUS,
+              "Slice received, waiting for more slices to complete the 
request."));
+    }
+    // sliceReqHandler will be cleared in the receive(req) method
+    return receive(req.get());
+  }
+
   /**
    * For {@link InsertRowsStatement} and {@link InsertMultiTabletsStatement}, 
the returned {@link
    * TSStatus} will use sub-status to record the endpoint for redirection. 
Each sub-status records
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3aa185b4d3d..cc80dcab4ef 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
 import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.rpc.RpcUtils;
 
 import org.apache.tsfile.fileSystem.FSType;
 import org.slf4j.Logger;
@@ -224,6 +225,9 @@ public class CommonConfig {
   private double pipeAllSinksRateLimitBytesPerSecond = -1;
   private int rateLimiterHotReloadCheckIntervalMs = 1000;
 
+  private int pipeConnectorRequestSliceThresholdBytes =
+      (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
+
   private boolean isSeperatedPipeHeartbeatEnabled = true;
   private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
   private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
@@ -1124,6 +1128,15 @@ public class CommonConfig {
     this.rateLimiterHotReloadCheckIntervalMs = 
rateLimiterHotReloadCheckIntervalMs;
   }
 
+  public int getPipeConnectorRequestSliceThresholdBytes() {
+    return pipeConnectorRequestSliceThresholdBytes;
+  }
+
+  public void setPipeConnectorRequestSliceThresholdBytes(
+      int pipeConnectorRequestSliceThresholdBytes) {
+    this.pipeConnectorRequestSliceThresholdBytes = 
pipeConnectorRequestSliceThresholdBytes;
+  }
+
   public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
     return twoStageAggregateMaxCombinerLiveTimeInMs;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 36003ec30e1..c41c932c395 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -409,6 +409,12 @@ public class CommonDescriptor {
                 "rate_limiter_hot_reload_check_interval_ms",
                 
String.valueOf(config.getRateLimiterHotReloadCheckIntervalMs()))));
 
+    config.setPipeConnectorRequestSliceThresholdBytes(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_connector_request_slice_threshold_bytes",
+                
String.valueOf(config.getPipeConnectorRequestSliceThresholdBytes()))));
+
     config.setSeperatedPipeHeartbeatEnabled(
         Boolean.parseBoolean(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index b215c7a3200..fc9fb0911a0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -144,6 +144,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getRateLimiterHotReloadCheckIntervalMs();
   }
 
+  public int getPipeConnectorRequestSliceThresholdBytes() {
+    return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes();
+  }
+
   public float getPipeLeaderCacheMemoryUsagePercentage() {
     return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
   }
@@ -375,6 +379,10 @@ public class PipeConfig {
     LOGGER.info(
         "RateLimiterHotReloadCheckIntervalMs: {}", 
getRateLimiterHotReloadCheckIntervalMs());
 
+    LOGGER.info(
+        "PipeConnectorRequestSliceThresholdBytes: {}",
+        getPipeConnectorRequestSliceThresholdBytes());
+
     LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", 
isSeperatedPipeHeartbeatEnabled());
     LOGGER.info(
         "PipeHeartbeatIntervalSecondsForCollectingPipeMeta: {}",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
index f15934afd5e..d79c430474c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
@@ -22,16 +22,32 @@ package org.apache.iotdb.commons.pipe.connector.client;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
+import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class IoTDBSyncClient extends IClientRPCService.Client
     implements ThriftClient, AutoCloseable {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSyncClient.class);
+
+  private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new 
AtomicInteger(0);
+
   private final String ipAddress;
   private final int port;
   private final TEndPoint endPoint;
@@ -82,6 +98,66 @@ public class IoTDBSyncClient extends IClientRPCService.Client
     ((TimeoutChangeableTransport) 
(getInputProtocol().getTransport())).setTimeout(timeout);
   }
 
+  @Override
+  public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws 
TException {
+    final int bodySizeLimit = 
PipeConfig.getInstance().getPipeConnectorRequestSliceThresholdBytes();
+    if (req.getVersion() != IoTDBConnectorRequestVersion.VERSION_1.getVersion()
+        || req.body.limit() < bodySizeLimit) {
+      return super.pipeTransfer(req);
+    }
+
+    LOGGER.warn(
+        "The body size of the request is too large. The request will be 
sliced. Origin req: {}-{}. "
+            + "Request body size: {}, threshold: {}",
+        req.getVersion(),
+        req.getType(),
+        req.body.limit(),
+        bodySizeLimit);
+
+    try {
+      final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement();
+      // Slice the buffer to avoid the buffer being too large
+      final int sliceCount =
+          req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit 
== 0 ? 0 : 1);
+      for (int i = 0; i < sliceCount; ++i) {
+        final int startIndexInBody = i * bodySizeLimit;
+        final int endIndexInBody = Math.min((i + 1) * bodySizeLimit, 
req.body.limit());
+        final TPipeTransferResp sliceResp =
+            super.pipeTransfer(
+                PipeTransferSliceReq.toTPipeTransferReq(
+                    sliceOrderId,
+                    req.getType(),
+                    i,
+                    sliceCount,
+                    req.body.duplicate(),
+                    startIndexInBody,
+                    endIndexInBody));
+
+        if (i == sliceCount - 1) {
+          return sliceResp;
+        }
+
+        if (sliceResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          throw new PipeConnectionException(
+              String.format(
+                  "Failed to transfer slice. Origin req: %s-%s, slice index: 
%d, slice count: %d. Reason: %s",
+                  req.getVersion(), req.getType(), i, sliceCount, 
sliceResp.getStatus()));
+        }
+      }
+
+      // Should not reach here
+      return super.pipeTransfer(req);
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Failed to transfer slice. Origin req: {}-{}. Retry the whole 
transfer.",
+          req.getVersion(),
+          req.getType(),
+          e);
+      // Fall back to the original behavior
+      return super.pipeTransfer(req);
+    }
+  }
+
   @Override
   public void close() throws Exception {
     invalidate();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
new file mode 100644
index 00000000000..b3bd16acc7d
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.payload.thrift.common;
+
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class PipeTransferSliceReqHandler {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferSliceReqHandler.class);
+
+  private int orderId = -1;
+
+  private short originReqType = -1;
+  private int originBodySize = -1;
+
+  private int sliceCount = -1;
+  private final List<byte[]> sliceBodies = new ArrayList<>();
+
+  public boolean receiveSlice(final PipeTransferSliceReq req) {
+    if (orderId == -1
+        || originReqType == -1
+        || originBodySize == -1
+        || sliceCount == -1
+        || sliceBodies.isEmpty()) {
+      if (orderId == -1
+          && originReqType == -1
+          && originBodySize == -1
+          && sliceCount == -1
+          && sliceBodies.isEmpty()) {
+        orderId = req.getOrderId();
+        originReqType = req.getOriginReqType();
+        originBodySize = req.getOriginBodySize();
+        sliceCount = req.getSliceCount();
+      } else {
+        LOGGER.warn(
+            "Invalid state: orderId={}, originReqType={}, originBodySize={}, 
sliceCount={}, sliceBodies.size={}",
+            orderId,
+            originReqType,
+            originBodySize,
+            sliceCount,
+            sliceBodies.size());
+        clear();
+        return false;
+      }
+    }
+
+    if (orderId != req.getOrderId()) {
+      LOGGER.warn("Order ID mismatch: expected {}, actual {}", orderId, 
req.getOrderId());
+      clear();
+      return false;
+    }
+    if (originReqType != req.getOriginReqType()) {
+      LOGGER.warn(
+          "Origin request type mismatch: expected {}, actual {}",
+          originReqType,
+          req.getOriginReqType());
+      clear();
+      return false;
+    }
+    if (originBodySize != req.getOriginBodySize()) {
+      LOGGER.warn(
+          "Origin body size mismatch: expected {}, actual {}",
+          originBodySize,
+          req.getOriginBodySize());
+      clear();
+      return false;
+    }
+    if (sliceCount != req.getSliceCount()) {
+      LOGGER.warn("Slice count mismatch: expected {}, actual {}", sliceCount, 
req.getSliceCount());
+      clear();
+      return false;
+    }
+    if (sliceBodies.size() != req.getSliceIndex()) {
+      LOGGER.warn(
+          "Invalid slice index: expected {}, actual {}", sliceBodies.size(), 
req.getSliceIndex());
+      clear();
+      return false;
+    }
+
+    sliceBodies.add(req.getSliceBody());
+    return true;
+  }
+
+  public Optional<TPipeTransferReq> makeReqIfComplete() {
+    if (sliceBodies.size() != sliceCount) {
+      return Optional.empty();
+    }
+
+    final TPipeTransferReq req = new TPipeTransferReq();
+    req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+    req.type = originReqType;
+
+    final ByteBuffer body = ByteBuffer.allocate(originBodySize);
+    sliceBodies.forEach(body::put);
+    body.flip();
+    req.body = body;
+
+    return Optional.of(req);
+  }
+
+  public void clear() {
+    orderId = -1;
+    originReqType = -1;
+    originBodySize = -1;
+    sliceCount = -1;
+    sliceBodies.clear();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
index 003c8b9afb3..0f9b6f4dafb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
@@ -53,6 +53,9 @@ public enum PipeRequestType {
 
   // RPC Compression
   TRANSFER_COMPRESSED((short) 300),
+
+  // Fallback Handling
+  TRANSFER_SLICE((short) 400),
   ;
 
   private final short type;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
new file mode 100644
index 00000000000..4df6008400d
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class PipeTransferSliceReq extends TPipeTransferReq {
+
+  private transient int orderId;
+
+  private transient short originReqType;
+  private transient int originBodySize;
+
+  private transient byte[] sliceBody;
+
+  private transient int sliceIndex;
+  private transient int sliceCount;
+
+  public int getOrderId() {
+    return orderId;
+  }
+
+  public short getOriginReqType() {
+    return originReqType;
+  }
+
+  public int getOriginBodySize() {
+    return originBodySize;
+  }
+
+  public byte[] getSliceBody() {
+    return sliceBody;
+  }
+
+  public int getSliceIndex() {
+    return sliceIndex;
+  }
+
+  public int getSliceCount() {
+    return sliceCount;
+  }
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  public static PipeTransferSliceReq toTPipeTransferReq(
+      final int orderId,
+      final short originReqType,
+      final int sliceIndex,
+      final int sliceCount,
+      final ByteBuffer duplicatedOriginBody,
+      final int startIndexInBody,
+      final int endIndexInBody)
+      throws IOException {
+    final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq();
+
+    sliceReq.orderId = orderId;
+
+    sliceReq.originReqType = originReqType;
+    sliceReq.originBodySize = duplicatedOriginBody.limit();
+
+    sliceReq.sliceBody = new byte[endIndexInBody - startIndexInBody];
+    duplicatedOriginBody.position(startIndexInBody);
+    duplicatedOriginBody.get(sliceReq.sliceBody);
+
+    sliceReq.sliceIndex = sliceIndex;
+    sliceReq.sliceCount = sliceCount;
+
+    sliceReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+    sliceReq.type = PipeRequestType.TRANSFER_SLICE.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(sliceReq.orderId, outputStream);
+
+      ReadWriteIOUtils.write(sliceReq.originReqType, outputStream);
+      ReadWriteIOUtils.write(sliceReq.originBodySize, outputStream);
+
+      ReadWriteIOUtils.write(new Binary(sliceReq.sliceBody), outputStream);
+
+      ReadWriteIOUtils.write(sliceReq.sliceIndex, outputStream);
+      ReadWriteIOUtils.write(sliceReq.sliceCount, outputStream);
+
+      sliceReq.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return sliceReq;
+  }
+
+  public static PipeTransferSliceReq fromTPipeTransferReq(final 
TPipeTransferReq transferReq) {
+    final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq();
+
+    sliceReq.orderId = ReadWriteIOUtils.readInt(transferReq.body);
+
+    sliceReq.originReqType = ReadWriteIOUtils.readShort(transferReq.body);
+    sliceReq.originBodySize = ReadWriteIOUtils.readInt(transferReq.body);
+
+    sliceReq.sliceBody = 
ReadWriteIOUtils.readBinary(transferReq.body).getValues();
+
+    sliceReq.sliceIndex = ReadWriteIOUtils.readInt(transferReq.body);
+    sliceReq.sliceCount = ReadWriteIOUtils.readInt(transferReq.body);
+
+    sliceReq.version = transferReq.version;
+    sliceReq.type = transferReq.type;
+    sliceReq.body = transferReq.body;
+
+    return sliceReq;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final PipeTransferSliceReq that = (PipeTransferSliceReq) obj;
+    return Objects.equals(orderId, that.orderId)
+        && Objects.equals(originReqType, that.originReqType)
+        && Objects.equals(originBodySize, that.originBodySize)
+        && Arrays.equals(sliceBody, that.sliceBody)
+        && Objects.equals(sliceIndex, that.sliceIndex)
+        && Objects.equals(sliceCount, that.sliceCount)
+        && Objects.equals(version, that.version)
+        && Objects.equals(type, that.type)
+        && Objects.equals(body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        orderId,
+        originReqType,
+        originBodySize,
+        Arrays.hashCode(sliceBody),
+        sliceIndex,
+        sliceCount,
+        version,
+        type,
+        body);
+  }
+}


Reply via email to