This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 28bb9b8cd6c Pipe: Support pipe's endpoint & global connector transfer 
rate limit & Fix RPC compression not enabled in stream batch mode (#12543)
28bb9b8cd6c is described below

commit 28bb9b8cd6cf54506d90eb477384023f581293d0
Author: Itami Sho <[email protected]>
AuthorDate: Mon May 27 12:22:11 2024 +0800

    Pipe: Support pipe's endpoint & global connector transfer rate limit & Fix 
RPC compression not enabled in stream batch mode (#12543)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../protocol/IoTDBConfigRegionAirGapConnector.java |  51 +++++----
 .../protocol/IoTDBConfigRegionConnector.java       |  52 +++++----
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  14 ++-
 .../payload/evolvable/builder/PipeEventBatch.java  |  18 +++
 .../airgap/IoTDBDataNodeAirGapConnector.java       |  20 ++--
 .../airgap/IoTDBDataRegionAirGapConnector.java     |  55 +++++----
 .../airgap/IoTDBSchemaRegionAirGapConnector.java   |  32 +++---
 .../async/IoTDBDataRegionAsyncConnector.java       |  11 +-
 .../PipeTransferTabletBatchEventHandler.java       |  19 ++-
 .../PipeTransferTabletInsertionEventHandler.java   |   5 +
 .../PipeTransferTsFileInsertionEventHandler.java   |  35 ++++--
 .../thrift/sync/IoTDBDataNodeSyncConnector.java    |  17 +--
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  | 127 ++++++++++++---------
 .../thrift/sync/IoTDBSchemaRegionConnector.java    |  36 +++---
 .../execution/load/LoadTsFileRateLimiter.java      |  32 +++---
 .../queryengine/plan/planner/TreeModelPlanner.java |   4 -
 .../resources/conf/iotdb-common.properties         |  10 ++
 .../async/AsyncPipeDataTransferServiceClient.java  |   4 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |  10 ++
 .../iotdb/commons/conf/CommonDescriptor.java       |   6 +
 .../iotdb/commons/pipe/config/PipeConfig.java      |   8 ++
 .../config/constant/PipeConnectorConstant.java     |   4 +
 .../pipe/connector/client/IoTDBSyncClient.java     |   7 ++
 .../pipe/connector/limiter/GlobalRateLimiter.java} |  55 ++++-----
 .../connector/limiter/PipeEndPointRateLimiter.java |  59 ++++++++++
 .../connector/protocol/IoTDBAirGapConnector.java   |  48 ++++++--
 .../pipe/connector/protocol/IoTDBConnector.java    |  45 +++++++-
 .../connector/protocol/IoTDBSslSyncConnector.java  |  24 ++--
 29 files changed, 536 insertions(+), 274 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index ec09b546648..9ea34939864 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.Socket;
 import java.util.HashMap;
 import java.util.Objects;
 
@@ -56,9 +55,8 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
 
   @Override
   protected byte[] generateHandShakeV1Payload() throws IOException {
-    return compressIfNeeded(
-        PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes(
-            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+    return PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes(
+        CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
   }
 
   @Override
@@ -71,7 +69,7 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
         CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
 
-    return 
compressIfNeeded(PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params));
+    return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
 
   @Override
@@ -107,7 +105,7 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
   @Override
   public void transfer(final Event event) throws Exception {
     final int socketIndex = nextSocketIndex();
-    final Socket socket = sockets.get(socketIndex);
+    final AirGapSocket socket = sockets.get(socketIndex);
 
     try {
       if (event instanceof PipeConfigRegionWritePlanEvent) {
@@ -131,7 +129,8 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
   }
 
   private void doTransferWrapper(
-      final Socket socket, final PipeConfigRegionWritePlanEvent 
pipeConfigRegionWritePlanEvent)
+      final AirGapSocket socket,
+      final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
       throws PipeException, IOException {
     try {
       // We increase the reference count for this event to determine if the 
event may be released.
@@ -147,13 +146,14 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
   }
 
   private void doTransfer(
-      final Socket socket, final PipeConfigRegionWritePlanEvent 
pipeConfigRegionWritePlanEvent)
+      final AirGapSocket socket,
+      final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
       throws PipeException, IOException {
     if (!send(
+        pipeConfigRegionWritePlanEvent.getPipeName(),
         socket,
-        compressIfNeeded(
-            PipeTransferConfigPlanReq.toTPipeTransferBytes(
-                pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())))) {
+        PipeTransferConfigPlanReq.toTPipeTransferBytes(
+            pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
       final String errorMessage =
           String.format(
               "Transfer config region write plan %s error. Socket: %s.",
@@ -170,7 +170,7 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
   }
 
   private void doTransferWrapper(
-      final Socket socket, final PipeConfigRegionSnapshotEvent 
pipeConfigRegionSnapshotEvent)
+      final AirGapSocket socket, final PipeConfigRegionSnapshotEvent 
pipeConfigRegionSnapshotEvent)
       throws PipeException, IOException {
     try {
       // We increase the reference count for this event to determine if the 
event may be released.
@@ -186,29 +186,30 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
   }
 
   private void doTransfer(
-      final Socket socket, final PipeConfigRegionSnapshotEvent 
pipeConfigRegionSnapshotEvent)
+      final AirGapSocket socket, final PipeConfigRegionSnapshotEvent 
pipeConfigRegionSnapshotEvent)
       throws PipeException, IOException {
+    final String pipeName = pipeConfigRegionSnapshotEvent.getPipeName();
     final File snapshot = pipeConfigRegionSnapshotEvent.getSnapshotFile();
     final File templateFile = pipeConfigRegionSnapshotEvent.getTemplateFile();
 
     // 1. Transfer snapshotFile, and template file if exists
-    transferFilePieces(snapshot, socket, true);
+    transferFilePieces(pipeName, snapshot, socket, true);
     if (Objects.nonNull(templateFile)) {
-      transferFilePieces(templateFile, socket, true);
+      transferFilePieces(pipeName, templateFile, socket, true);
     }
     // 2. Transfer file seal signal, which means the snapshots are transferred 
completely
     if (!send(
+        pipeName,
         socket,
-        compressIfNeeded(
-            PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
-                // The pattern is surely Non-null
-                pipeConfigRegionSnapshotEvent.getPatternString(),
-                snapshot.getName(),
-                snapshot.length(),
-                Objects.nonNull(templateFile) ? templateFile.getName() : null,
-                Objects.nonNull(templateFile) ? templateFile.length() : 0,
-                pipeConfigRegionSnapshotEvent.getFileType(),
-                pipeConfigRegionSnapshotEvent.toSealTypeString())))) {
+        PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
+            // The pattern is surely Non-null
+            pipeConfigRegionSnapshotEvent.getPatternString(),
+            snapshot.getName(),
+            snapshot.length(),
+            Objects.nonNull(templateFile) ? templateFile.getName() : null,
+            Objects.nonNull(templateFile) ? templateFile.length() : 0,
+            pipeConfigRegionSnapshotEvent.getFileType(),
+            pipeConfigRegionSnapshotEvent.toSealTypeString()))) {
       final String errorMessage =
           String.format("Seal config region snapshot %s error. Socket %s.", 
snapshot, socket);
       // Send handshake because we don't know whether the receiver side 
configNode
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index b199df4c390..00279bf22d9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.tsfile.utils.Pair;
@@ -123,13 +124,15 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
 
     final TPipeTransferResp resp;
     try {
-      resp =
-          clientAndStatus
-              .getLeft()
-              .pipeTransfer(
-                  compressIfNeeded(
-                      PipeTransferConfigPlanReq.toTPipeTransferReq(
-                          
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())));
+      final TPipeTransferReq req =
+          compressIfNeeded(
+              PipeTransferConfigPlanReq.toTPipeTransferReq(
+                  pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()));
+      rateLimitIfNeeded(
+          pipeConfigRegionWritePlanEvent.getPipeName(),
+          clientAndStatus.getLeft().getEndPoint(),
+          req.getBody().length);
+      resp = clientAndStatus.getLeft().pipeTransfer(req);
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
@@ -177,32 +180,35 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
 
   private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
       throws PipeException, IOException {
+    final String pipeName = snapshotEvent.getPipeName();
     final File snapshotFile = snapshotEvent.getSnapshotFile();
     final File templateFile = snapshotEvent.getTemplateFile();
     final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
clientManager.getClient();
 
     // 1. Transfer snapshotFile, and template File if exists
-    transferFilePieces(snapshotFile, clientAndStatus, true);
+    transferFilePieces(pipeName, snapshotFile, clientAndStatus, true);
     if (Objects.nonNull(templateFile)) {
-      transferFilePieces(templateFile, clientAndStatus, true);
+      transferFilePieces(pipeName, templateFile, clientAndStatus, true);
     }
     // 2. Transfer file seal signal, which means the snapshots are transferred 
completely
     final TPipeTransferResp resp;
     try {
-      resp =
-          clientAndStatus
-              .getLeft()
-              .pipeTransfer(
-                  compressIfNeeded(
-                      PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
-                          // The pattern is surely Non-null
-                          snapshotEvent.getPatternString(),
-                          snapshotFile.getName(),
-                          snapshotFile.length(),
-                          Objects.nonNull(templateFile) ? 
templateFile.getName() : null,
-                          Objects.nonNull(templateFile) ? 
templateFile.length() : 0,
-                          snapshotEvent.getFileType(),
-                          snapshotEvent.toSealTypeString())));
+      final TPipeTransferReq req =
+          compressIfNeeded(
+              PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
+                  // The pattern is surely Non-null
+                  snapshotEvent.getPatternString(),
+                  snapshotFile.getName(),
+                  snapshotFile.length(),
+                  Objects.nonNull(templateFile) ? templateFile.getName() : 
null,
+                  Objects.nonNull(templateFile) ? templateFile.length() : 0,
+                  snapshotEvent.getFileType(),
+                  snapshotEvent.toSealTypeString()));
+      rateLimitIfNeeded(
+          snapshotEvent.getPipeName(),
+          clientAndStatus.getLeft().getEndPoint(),
+          req.getBody().length);
+      resp = clientAndStatus.getLeft().pipeTransfer(req);
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5e481330d09..2f4714d0cc5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1107,7 +1107,7 @@ public class IoTDBConfig {
 
   private long loadCleanupTaskExecutionDelayTimeSeconds = 1800L; // 30 min
 
-  private double loadWriteThroughputBytesPerSecond = Double.MAX_VALUE; // 
Bytes/s
+  private double loadWriteThroughputBytesPerSecond = -1; // Bytes/s
 
   /** Pipe related */
   /** initialized as empty, updated based on the latest `systemDir` during 
querying */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 82e2e09fbfa..6ac64b6c482 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -922,7 +922,7 @@ public class IoTDBDescriptor {
     conf.setLoadWriteThroughputBytesPerSecond(
         Double.parseDouble(
             properties.getProperty(
-                "load_write_throughput_bytes_per_sec",
+                "load_write_throughput_bytes_per_second",
                 String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
 
     conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", 
conf.getExtPipeDir()).trim());
@@ -1718,9 +1718,19 @@ public class IoTDBDescriptor {
       conf.setLoadWriteThroughputBytesPerSecond(
           Double.parseDouble(
               properties.getProperty(
-                  "load_write_throughput_bytes_per_sec",
+                  "load_write_throughput_bytes_per_second",
                   
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
 
+      // update pipe config
+      commonDescriptor
+          .getConfig()
+          .setPipeAllSinksRateLimitBytesPerSecond(
+              Double.parseDouble(
+                  properties.getProperty(
+                      "pipe_all_sinks_rate_limit_bytes_per_second",
+                      String.valueOf(
+                          
commonDescriptor.getConfig().getPipeAllSinksRateLimitBytesPerSecond()))));
+
       // update merge_threshold_of_explain_analyze
       conf.setMergeThresholdOfExplainAnalyze(
           Integer.parseInt(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
index 74b49ce7272..44d72b1e1c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
@@ -39,7 +39,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 public class PipeEventBatch implements AutoCloseable {
@@ -61,6 +63,9 @@ public class PipeEventBatch implements AutoCloseable {
   private final PipeMemoryBlock allocatedMemoryBlock;
   private long totalBufferSize = 0;
 
+  // Used to rate limit when transferring data
+  private final Map<String, Long> pipeName2BytesAccumulated = new HashMap<>();
+
   public PipeEventBatch(int maxDelayInMs, long requestMaxBatchSizeInBytes) {
     this.maxDelayInMs = maxDelayInMs;
     this.allocatedMemoryBlock =
@@ -112,6 +117,10 @@ public class PipeEventBatch implements AutoCloseable {
 
         final int bufferSize = buildTabletInsertionBuffer(event);
         totalBufferSize += bufferSize;
+        pipeName2BytesAccumulated.compute(
+            ((EnrichedEvent) event).getPipeName(),
+            (pipeName, bytesAccumulated) ->
+                bytesAccumulated == null ? bufferSize : bytesAccumulated + 
bufferSize);
 
         if (firstEventProcessingTime == Long.MIN_VALUE) {
           firstEventProcessingTime = System.currentTimeMillis();
@@ -137,6 +146,7 @@ public class PipeEventBatch implements AutoCloseable {
     firstEventProcessingTime = Long.MIN_VALUE;
 
     totalBufferSize = 0;
+    pipeName2BytesAccumulated.clear();
   }
 
   public PipeTransferTabletBatchReq toTPipeTransferReq() throws IOException {
@@ -160,6 +170,14 @@ public class PipeEventBatch implements AutoCloseable {
     return new ArrayList<>(requestCommitIds);
   }
 
+  public Map<String, Long> deepCopyPipeName2BytesAccumulated() {
+    return new HashMap<>(pipeName2BytesAccumulated);
+  }
+
+  public Map<String, Long> getPipeName2BytesAccumulated() {
+    return pipeName2BytesAccumulated;
+  }
+
   private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
       throws IOException, WALPipeException {
     final ByteBuffer buffer;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 01c7de76da2..f6e80a28917 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -39,7 +39,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.Socket;
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Set;
@@ -88,9 +87,8 @@ public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector
 
   @Override
   protected byte[] generateHandShakeV1Payload() throws IOException {
-    return compressIfNeeded(
-        PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes(
-            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+    return PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes(
+        CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
   }
 
   @Override
@@ -103,11 +101,12 @@ public abstract class IoTDBDataNodeAirGapConnector 
extends IoTDBAirGapConnector
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
         CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
 
-    return 
compressIfNeeded(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params));
+    return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
 
   protected void doTransferWrapper(
-      final Socket socket, final PipeSchemaRegionWritePlanEvent 
pipeSchemaRegionWritePlanEvent)
+      final AirGapSocket socket,
+      final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
       throws PipeException, IOException {
     try {
       // We increase the reference count for this event to determine if the 
event may be released.
@@ -123,13 +122,14 @@ public abstract class IoTDBDataNodeAirGapConnector 
extends IoTDBAirGapConnector
   }
 
   private void doTransfer(
-      final Socket socket, final PipeSchemaRegionWritePlanEvent 
pipeSchemaRegionWritePlanEvent)
+      final AirGapSocket socket,
+      final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
       throws PipeException, IOException {
     if (!send(
+        pipeSchemaRegionWritePlanEvent.getPipeName(),
         socket,
-        compressIfNeeded(
-            PipeTransferPlanNodeReq.toTPipeTransferBytes(
-                pipeSchemaRegionWritePlanEvent.getPlanNode())))) {
+        PipeTransferPlanNodeReq.toTPipeTransferBytes(
+            pipeSchemaRegionWritePlanEvent.getPlanNode()))) {
       final String errorMessage =
           String.format(
               "Transfer data node write plan %s error. Socket: %s.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 84ca0b0ccdb..e83cd623fed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.Socket;
 import java.util.Objects;
 
 public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector {
@@ -69,7 +68,7 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
     }
 
     final int socketIndex = nextSocketIndex();
-    final Socket socket = sockets.get(socketIndex);
+    final AirGapSocket socket = sockets.get(socketIndex);
 
     try {
       if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
@@ -106,7 +105,7 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
     }
 
     final int socketIndex = nextSocketIndex();
-    final Socket socket = sockets.get(socketIndex);
+    final AirGapSocket socket = sockets.get(socketIndex);
 
     try {
       doTransferWrapper(socket, (PipeTsFileInsertionEvent) 
tsFileInsertionEvent);
@@ -125,7 +124,7 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
   @Override
   public void transfer(final Event event) throws Exception {
     final int socketIndex = nextSocketIndex();
-    final Socket socket = sockets.get(socketIndex);
+    final AirGapSocket socket = sockets.get(socketIndex);
 
     try {
       if (event instanceof PipeSchemaRegionWritePlanEvent) {
@@ -147,7 +146,7 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
   }
 
   private void doTransferWrapper(
-      final Socket socket,
+      final AirGapSocket socket,
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
       throws PipeException, WALPipeException, IOException {
     try {
@@ -164,19 +163,18 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
   }
 
   private void doTransfer(
-      final Socket socket,
+      final AirGapSocket socket,
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
       throws PipeException, WALPipeException, IOException {
     final InsertNode insertNode =
         pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
     final byte[] bytes =
-        compressIfNeeded(
-            Objects.isNull(insertNode)
-                ? PipeTransferTabletBinaryReq.toTPipeTransferBytes(
-                    pipeInsertNodeTabletInsertionEvent.getByteBuffer())
-                : 
PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode));
+        Objects.isNull(insertNode)
+            ? PipeTransferTabletBinaryReq.toTPipeTransferBytes(
+                pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+            : PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode);
 
-    if (!send(socket, bytes)) {
+    if (!send(pipeInsertNodeTabletInsertionEvent.getPipeName(), socket, 
bytes)) {
       final String errorMessage =
           String.format(
               "Transfer PipeInsertNodeTabletInsertionEvent %s error. Socket: 
%s",
@@ -190,7 +188,7 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
   }
 
   private void doTransferWrapper(
-      final Socket socket, final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
+      final AirGapSocket socket, final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
       throws PipeException, IOException {
     try {
       // We increase the reference count for this event to determine if the 
event may be released.
@@ -206,14 +204,14 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
   }
 
   private void doTransfer(
-      final Socket socket, final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
+      final AirGapSocket socket, final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
       throws PipeException, IOException {
     if (!send(
+        pipeRawTabletInsertionEvent.getPipeName(),
         socket,
-        compressIfNeeded(
-            PipeTransferTabletRawReq.toTPipeTransferBytes(
-                pipeRawTabletInsertionEvent.convertToTablet(),
-                pipeRawTabletInsertionEvent.isAligned())))) {
+        PipeTransferTabletRawReq.toTPipeTransferBytes(
+            pipeRawTabletInsertionEvent.convertToTablet(),
+            pipeRawTabletInsertionEvent.isAligned()))) {
       final String errorMessage =
           String.format(
               "Transfer PipeRawTabletInsertionEvent %s error. Socket: %s.",
@@ -227,7 +225,7 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
   }
 
   private void doTransferWrapper(
-      final Socket socket, final PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
+      final AirGapSocket socket, final PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
       throws PipeException, IOException {
     try {
       // We increase the reference count for this event to determine if the 
event may be released.
@@ -243,22 +241,23 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
   }
 
   private void doTransfer(
-      final Socket socket, final PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
+      final AirGapSocket socket, final PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
       throws PipeException, IOException {
+    final String pipeName = pipeTsFileInsertionEvent.getPipeName();
     final File tsFile = pipeTsFileInsertionEvent.getTsFile();
     final String errorMessage = String.format("Seal file %s error. Socket 
%s.", tsFile, socket);
 
     // 1. Transfer file piece by piece, and mod if needed
     if (pipeTsFileInsertionEvent.isWithMod() && 
supportModsIfIsDataNodeReceiver) {
       final File modFile = pipeTsFileInsertionEvent.getModFile();
-      transferFilePieces(modFile, socket, true);
-      transferFilePieces(tsFile, socket, true);
+      transferFilePieces(pipeName, modFile, socket, true);
+      transferFilePieces(pipeName, tsFile, socket, true);
       // 2. Transfer file seal signal with mod, which means the file is 
transferred completely
       if (!send(
+          pipeName,
           socket,
-          compressIfNeeded(
-              PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
-                  modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length())))) {
+          PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
+              modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length()))) {
         receiverStatusHandler.handle(
             new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
                 .setMessage(errorMessage),
@@ -268,12 +267,12 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
         LOGGER.info("Successfully transferred file {}.", tsFile);
       }
     } else {
-      transferFilePieces(tsFile, socket, false);
+      transferFilePieces(pipeName, tsFile, socket, false);
       // 2. Transfer file seal signal without mod, which means the file is 
transferred completely
       if (!send(
+          pipeName,
           socket,
-          compressIfNeeded(
-              PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(), 
tsFile.length())))) {
+          PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(), 
tsFile.length()))) {
         receiverStatusHandler.handle(
             new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
                 .setMessage(errorMessage),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
index f390994e4f0..a97408266b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.Socket;
 import java.util.Objects;
 
 public class IoTDBSchemaRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector {
@@ -61,7 +60,7 @@ public class IoTDBSchemaRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnect
   @Override
   public void transfer(final Event event) throws Exception {
     final int socketIndex = nextSocketIndex();
-    final Socket socket = sockets.get(socketIndex);
+    final AirGapSocket socket = sockets.get(socketIndex);
 
     try {
       if (event instanceof PipeSchemaRegionWritePlanEvent) {
@@ -85,7 +84,7 @@ public class IoTDBSchemaRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnect
   }
 
   private void doTransferWrapper(
-      final Socket socket, final PipeSchemaRegionSnapshotEvent 
pipeSchemaRegionSnapshotEvent)
+      final AirGapSocket socket, final PipeSchemaRegionSnapshotEvent 
pipeSchemaRegionSnapshotEvent)
       throws PipeException, IOException {
     try {
       // We increase the reference count for this event to determine if the 
event may be released.
@@ -101,29 +100,30 @@ public class IoTDBSchemaRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnect
   }
 
   private void doTransfer(
-      final Socket socket, final PipeSchemaRegionSnapshotEvent 
pipeSchemaRegionSnapshotEvent)
+      final AirGapSocket socket, final PipeSchemaRegionSnapshotEvent 
pipeSchemaRegionSnapshotEvent)
       throws PipeException, IOException {
+    final String pipeName = pipeSchemaRegionSnapshotEvent.getPipeName();
     final File mtreeSnapshotFile = 
pipeSchemaRegionSnapshotEvent.getMTreeSnapshotFile();
     final File tagLogSnapshotFile = 
pipeSchemaRegionSnapshotEvent.getTagLogSnapshotFile();
 
     // 1. Transfer mTreeSnapshotFile, and tLog file if exists
-    transferFilePieces(mtreeSnapshotFile, socket, true);
+    transferFilePieces(pipeName, mtreeSnapshotFile, socket, true);
     if (Objects.nonNull(tagLogSnapshotFile)) {
-      transferFilePieces(tagLogSnapshotFile, socket, true);
+      transferFilePieces(pipeName, tagLogSnapshotFile, socket, true);
     }
     // 2. Transfer file seal signal, which means the snapshots is transferred 
completely
     if (!send(
+        pipeName,
         socket,
-        compressIfNeeded(
-            PipeTransferSchemaSnapshotSealReq.toTPipeTransferBytes(
-                // The pattern is surely Non-null
-                pipeSchemaRegionSnapshotEvent.getPatternString(),
-                mtreeSnapshotFile.getName(),
-                mtreeSnapshotFile.length(),
-                Objects.nonNull(tagLogSnapshotFile) ? 
tagLogSnapshotFile.getName() : null,
-                Objects.nonNull(tagLogSnapshotFile) ? 
tagLogSnapshotFile.length() : 0,
-                pipeSchemaRegionSnapshotEvent.getDatabaseName(),
-                pipeSchemaRegionSnapshotEvent.toSealTypeString())))) {
+        PipeTransferSchemaSnapshotSealReq.toTPipeTransferBytes(
+            // The pattern is surely Non-null
+            pipeSchemaRegionSnapshotEvent.getPatternString(),
+            mtreeSnapshotFile.getName(),
+            mtreeSnapshotFile.length(),
+            Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.getName() 
: null,
+            Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.length() 
: 0,
+            pipeSchemaRegionSnapshotEvent.getDatabaseName(),
+            pipeSchemaRegionSnapshotEvent.toSealTypeString()))) {
       final String errorMessage =
           String.format(
               "Seal schema region snapshot file %s and %s error. Socket %s.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 632d929e372..3edcedc31d8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -196,11 +196,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
                 pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
 
         transfer(
-            // insertNode.getDevicePath() is null for InsertRowsNode
-            Objects.nonNull(insertNode) && 
Objects.nonNull(insertNode.getDevicePath())
-                ? insertNode.getDevicePath().getFullPath()
-                : null,
-            pipeTransferInsertNodeReqHandler);
+            // getDeviceId() may return null for InsertRowsNode
+            pipeInsertNodeTabletInsertionEvent.getDeviceId(), 
pipeTransferInsertNodeReqHandler);
       } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
         final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
             (PipeRawTabletInsertionEvent) tabletInsertionEvent;
@@ -491,7 +488,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
 
   @Override
   // synchronized to avoid close connector when transfer event
-  public synchronized void close() throws Exception {
+  public synchronized void close() {
     isClosed.set(true);
 
     retryConnector.close();
@@ -500,6 +497,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     if (tabletBatchBuilder != null) {
       tabletBatchBuilder.close();
     }
+
+    super.close();
   }
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 88e41343950..7407b1b9554 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<TPipeTransferResp> {
@@ -50,7 +51,10 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
 
   private final List<Long> requestCommitIds;
   private final List<Event> events;
+  private final Map<String, Long> pipeName2BytesAccumulated;
+
   private final TPipeTransferReq req;
+  private final double reqCompressionRatio;
 
   private final IoTDBDataRegionAsyncConnector connector;
 
@@ -60,16 +64,25 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
     // Deep copy to keep Ids' and events' reference
     requestCommitIds = batch.deepCopyRequestCommitIds();
     events = batch.deepCopyEvents();
+    pipeName2BytesAccumulated = batch.deepCopyPipeName2BytesAccumulated();
+
+    final TPipeTransferReq uncompressedReq = batch.toTPipeTransferReq();
     req =
         connector.isRpcCompressionEnabled()
-            ? batch.toTPipeTransferReq()
-            : PipeTransferCompressedReq.toTPipeTransferReq(
-                batch.toTPipeTransferReq(), connector.getCompressors());
+            ? PipeTransferCompressedReq.toTPipeTransferReq(
+                uncompressedReq, connector.getCompressors())
+            : uncompressedReq;
+    reqCompressionRatio = (double) req.getBody().length / 
uncompressedReq.getBody().length;
 
     this.connector = connector;
   }
 
   public void transfer(final AsyncPipeDataTransferServiceClient client) throws 
TException {
+    for (final Map.Entry<String, Long> entry : 
pipeName2BytesAccumulated.entrySet()) {
+      connector.rateLimitIfNeeded(
+          entry.getKey(), client.getEndPoint(), (long) (entry.getValue() * 
reqCompressionRatio));
+    }
+
     client.pipeTransfer(req, this);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 5d834440d7f..94fa6d5ac1a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -53,6 +53,11 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
   }
 
   public void transfer(AsyncPipeDataTransferServiceClient client) throws 
TException {
+    if (event instanceof EnrichedEvent) {
+      connector.rateLimitIfNeeded(
+          ((EnrichedEvent) event).getPipeName(), client.getEndPoint(), 
req.getBody().length);
+    }
+
     doTransfer(client, req);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 8d02aeb8fa8..e7e374443c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -122,15 +122,22 @@ public class PipeTransferTsFileInsertionEventHandler
         transfer(clientManager, client);
       } else if (currentFile == tsFile) {
         isSealSignalSent.set(true);
-        client.pipeTransfer(
-            PipeTransferCompressedReq.toTPipeTransferReq(
-                transferMod
-                    ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
-                        modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length())
-                    : PipeTransferTsFileSealReq.toTPipeTransferReq(
-                        tsFile.getName(), tsFile.length()),
-                connector.getCompressors()),
-            this);
+
+        final TPipeTransferReq uncompressedReq =
+            transferMod
+                ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+                    modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length())
+                : 
PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length());
+        final TPipeTransferReq req =
+            connector.isRpcCompressionEnabled()
+                ? PipeTransferCompressedReq.toTPipeTransferReq(
+                    uncompressedReq, connector.getCompressors())
+                : uncompressedReq;
+
+        connector.rateLimitIfNeeded(
+            event.getPipeName(), client.getEndPoint(), req.getBody().length);
+
+        client.pipeTransfer(req, this);
       }
       return;
     }
@@ -145,12 +152,16 @@ public class PipeTransferTsFileInsertionEventHandler
                 currentFile.getName(), position, payload)
             : PipeTransferTsFilePieceReq.toTPipeTransferReq(
                 currentFile.getName(), position, payload);
-    client.pipeTransfer(
+    final TPipeTransferReq req =
         connector.isRpcCompressionEnabled()
             ? PipeTransferCompressedReq.toTPipeTransferReq(
                 uncompressedReq, connector.getCompressors())
-            : uncompressedReq,
-        this);
+            : uncompressedReq;
+
+    connector.rateLimitIfNeeded(event.getPipeName(), client.getEndPoint(), 
req.getBody().length);
+
+    client.pipeTransfer(req, this);
+
     position += readLength;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 33c97ce27f2..aef655c3240 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.tsfile.utils.Pair;
@@ -113,13 +114,15 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
 
     final TPipeTransferResp resp;
     try {
-      resp =
-          clientAndStatus
-              .getLeft()
-              .pipeTransfer(
-                  compressIfNeeded(
-                      PipeTransferPlanNodeReq.toTPipeTransferReq(
-                          pipeSchemaRegionWritePlanEvent.getPlanNode())));
+      final TPipeTransferReq req =
+          compressIfNeeded(
+              PipeTransferPlanNodeReq.toTPipeTransferReq(
+                  pipeSchemaRegionWritePlanEvent.getPlanNode()));
+      rateLimitIfNeeded(
+          pipeSchemaRegionWritePlanEvent.getPipeName(),
+          clientAndStatus.getLeft().getEndPoint(),
+          req.getBody().length);
+      resp = clientAndStatus.getLeft().pipeTransfer(req);
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 8ee65ef94ed..cf93bac6ced 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.tsfile.utils.Pair;
@@ -56,6 +57,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 import java.util.Objects;
 
 public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
@@ -176,10 +178,23 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
     final TPipeTransferResp resp;
     try {
-      resp =
-          clientAndStatus
-              .getLeft()
-              
.pipeTransfer(compressIfNeeded(batchToTransfer.toTPipeTransferReq()));
+      final TPipeTransferReq uncompressedReq = 
batchToTransfer.toTPipeTransferReq();
+      final long uncompressedSize = uncompressedReq.getBody().length;
+
+      final TPipeTransferReq req = compressIfNeeded(uncompressedReq);
+      final long compressedSize = req.getBody().length;
+
+      final double compressionRatio = (double) compressedSize / 
uncompressedSize;
+
+      for (final Map.Entry<String, Long> entry :
+          batchToTransfer.getPipeName2BytesAccumulated().entrySet()) {
+        rateLimitIfNeeded(
+            entry.getKey(),
+            clientAndStatus.getLeft().getEndPoint(),
+            (long) (entry.getValue() * compressionRatio));
+      }
+
+      resp = clientAndStatus.getLeft().pipeTransfer(req);
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
@@ -230,30 +245,26 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
   private void doTransfer(
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
       throws PipeException {
-    final InsertNode insertNode;
-    Pair<IoTDBSyncClient, Boolean> clientAndStatus = null;
     final TPipeTransferResp resp;
 
+    Pair<IoTDBSyncClient, Boolean> clientAndStatus = null;
     try {
-      insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
       // getDeviceId() may return null for InsertRowsNode, will be equal to 
getClient(null)
       clientAndStatus = 
clientManager.getClient(pipeInsertNodeTabletInsertionEvent.getDeviceId());
-      if (insertNode != null) {
-        resp =
-            clientAndStatus
-                .getLeft()
-                .pipeTransfer(
-                    compressIfNeeded(
-                        
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode)));
-      } else {
-        resp =
-            clientAndStatus
-                .getLeft()
-                .pipeTransfer(
-                    compressIfNeeded(
-                        PipeTransferTabletBinaryReq.toTPipeTransferReq(
-                            
pipeInsertNodeTabletInsertionEvent.getByteBuffer())));
-      }
+
+      final InsertNode insertNode =
+          pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
+      final TPipeTransferReq req =
+          compressIfNeeded(
+              insertNode != null
+                  ? 
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode)
+                  : PipeTransferTabletBinaryReq.toTPipeTransferReq(
+                      pipeInsertNodeTabletInsertionEvent.getByteBuffer()));
+      rateLimitIfNeeded(
+          pipeInsertNodeTabletInsertionEvent.getPipeName(),
+          clientAndStatus.getLeft().getEndPoint(),
+          req.getBody().length);
+      resp = clientAndStatus.getLeft().pipeTransfer(req);
     } catch (final Exception e) {
       if (clientAndStatus != null) {
         clientAndStatus.setRight(false);
@@ -276,10 +287,11 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
               pipeInsertNodeTabletInsertionEvent.coreReportMessage(), status),
           pipeInsertNodeTabletInsertionEvent.toString());
     }
-    // insertNode.getDevicePath() is null for InsertRowsNode
-    if (insertNode != null && insertNode.getDevicePath() != null && 
status.isSetRedirectNode()) {
+    // pipeInsertNodeTabletInsertionEvent.getDeviceId() is null for 
InsertRowsNode
+    if (Objects.nonNull(pipeInsertNodeTabletInsertionEvent.getDeviceId())
+        && status.isSetRedirectNode()) {
       clientManager.updateLeaderCache(
-          insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
+          pipeInsertNodeTabletInsertionEvent.getDeviceId(), 
status.getRedirectNode());
     }
   }
 
@@ -305,14 +317,16 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
     final TPipeTransferResp resp;
 
     try {
-      resp =
-          clientAndStatus
-              .getLeft()
-              .pipeTransfer(
-                  compressIfNeeded(
-                      PipeTransferTabletRawReq.toTPipeTransferReq(
-                          pipeRawTabletInsertionEvent.convertToTablet(),
-                          pipeRawTabletInsertionEvent.isAligned())));
+      final TPipeTransferReq req =
+          compressIfNeeded(
+              PipeTransferTabletRawReq.toTPipeTransferReq(
+                  pipeRawTabletInsertionEvent.convertToTablet(),
+                  pipeRawTabletInsertionEvent.isAligned()));
+      rateLimitIfNeeded(
+          pipeRawTabletInsertionEvent.getPipeName(),
+          clientAndStatus.getLeft().getEndPoint(),
+          req.getBody().length);
+      resp = clientAndStatus.getLeft().pipeTransfer(req);
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
@@ -356,6 +370,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
   private void doTransfer(final PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
       throws PipeException, IOException {
+    final String pipeName = pipeTsFileInsertionEvent.getPipeName();
     final File tsFile = pipeTsFileInsertionEvent.getTsFile();
     final File modFile = pipeTsFileInsertionEvent.getModFile();
     final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
clientManager.getClient();
@@ -363,20 +378,19 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
     // 1. Transfer tsFile, and mod file if exists and receiver's version >= 2
     if (pipeTsFileInsertionEvent.isWithMod() && 
clientManager.supportModsIfIsDataNodeReceiver()) {
-      transferFilePieces(modFile, clientAndStatus, true);
-      transferFilePieces(tsFile, clientAndStatus, true);
+      transferFilePieces(pipeName, modFile, clientAndStatus, true);
+      transferFilePieces(pipeName, tsFile, clientAndStatus, true);
       // 2. Transfer file seal signal with mod, which means the file is 
transferred completely
       try {
-        resp =
-            clientAndStatus
-                .getLeft()
-                .pipeTransfer(
-                    compressIfNeeded(
-                        PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
-                            modFile.getName(),
-                            modFile.length(),
-                            tsFile.getName(),
-                            tsFile.length())));
+        final TPipeTransferReq req =
+            compressIfNeeded(
+                PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+                    modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length()));
+        rateLimitIfNeeded(
+            pipeTsFileInsertionEvent.getPipeName(),
+            clientAndStatus.getLeft().getEndPoint(),
+            req.getBody().length);
+        resp = clientAndStatus.getLeft().pipeTransfer(req);
       } catch (final Exception e) {
         clientAndStatus.setRight(false);
         clientManager.adjustTimeoutIfNecessary(e);
@@ -385,16 +399,17 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
             e);
       }
     } else {
-      transferFilePieces(tsFile, clientAndStatus, false);
+      transferFilePieces(pipeName, tsFile, clientAndStatus, false);
       // 2. Transfer file seal signal without mod, which means the file is 
transferred completely
       try {
-        resp =
-            clientAndStatus
-                .getLeft()
-                .pipeTransfer(
-                    compressIfNeeded(
-                        PipeTransferTsFileSealReq.toTPipeTransferReq(
-                            tsFile.getName(), tsFile.length())));
+        final TPipeTransferReq req =
+            compressIfNeeded(
+                PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), 
tsFile.length()));
+        rateLimitIfNeeded(
+            pipeTsFileInsertionEvent.getPipeName(),
+            clientAndStatus.getLeft().getEndPoint(),
+            req.getBody().length);
+        resp = clientAndStatus.getLeft().pipeTransfer(req);
       } catch (final Exception e) {
         clientAndStatus.setRight(false);
         clientManager.adjustTimeoutIfNecessary(e);
@@ -419,10 +434,10 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
   @Override
   public void close() {
-    super.close();
-
     if (tabletBatchBuilder != null) {
       tabletBatchBuilder.close();
     }
+
+    super.close();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
index 2b2cba03f20..210bbec3dc0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.tsfile.utils.Pair;
@@ -87,32 +88,35 @@ public class IoTDBSchemaRegionConnector extends 
IoTDBDataNodeSyncConnector {
 
   private void doTransfer(final PipeSchemaRegionSnapshotEvent snapshotEvent)
       throws PipeException, IOException {
+    final String pipeName = snapshotEvent.getPipeName();
     final File mTreeSnapshotFile = snapshotEvent.getMTreeSnapshotFile();
     final File tagLogSnapshotFile = snapshotEvent.getTagLogSnapshotFile();
     final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
clientManager.getClient();
     final TPipeTransferResp resp;
 
     // 1. Transfer mTreeSnapshotFile, and tLog file if exists
-    transferFilePieces(mTreeSnapshotFile, clientAndStatus, true);
+    transferFilePieces(pipeName, mTreeSnapshotFile, clientAndStatus, true);
     if (Objects.nonNull(tagLogSnapshotFile)) {
-      transferFilePieces(tagLogSnapshotFile, clientAndStatus, true);
+      transferFilePieces(pipeName, tagLogSnapshotFile, clientAndStatus, true);
     }
     // 2. Transfer file seal signal, which means the snapshots are transferred 
completely
     try {
-      resp =
-          clientAndStatus
-              .getLeft()
-              .pipeTransfer(
-                  compressIfNeeded(
-                      PipeTransferSchemaSnapshotSealReq.toTPipeTransferReq(
-                          // The pattern is surely Non-null
-                          snapshotEvent.getPatternString(),
-                          mTreeSnapshotFile.getName(),
-                          mTreeSnapshotFile.length(),
-                          Objects.nonNull(tagLogSnapshotFile) ? 
tagLogSnapshotFile.getName() : null,
-                          Objects.nonNull(tagLogSnapshotFile) ? 
tagLogSnapshotFile.length() : 0,
-                          snapshotEvent.getDatabaseName(),
-                          snapshotEvent.toSealTypeString())));
+      final TPipeTransferReq req =
+          compressIfNeeded(
+              PipeTransferSchemaSnapshotSealReq.toTPipeTransferReq(
+                  // The pattern is surely Non-null
+                  snapshotEvent.getPatternString(),
+                  mTreeSnapshotFile.getName(),
+                  mTreeSnapshotFile.length(),
+                  Objects.nonNull(tagLogSnapshotFile) ? 
tagLogSnapshotFile.getName() : null,
+                  Objects.nonNull(tagLogSnapshotFile) ? 
tagLogSnapshotFile.length() : 0,
+                  snapshotEvent.getDatabaseName(),
+                  snapshotEvent.toSealTypeString()));
+      rateLimitIfNeeded(
+          snapshotEvent.getPipeName(),
+          clientAndStatus.getLeft().getEndPoint(),
+          req.getBody().length);
+      resp = clientAndStatus.getLeft().pipeTransfer(req);
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
index 8aa7841a8d0..ce7046bf5ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
@@ -33,22 +33,19 @@ public class LoadTsFileRateLimiter {
       new AtomicDouble(CONFIG.getLoadWriteThroughputBytesPerSecond());
   private final RateLimiter loadWriteRateLimiter;
 
-  private LoadTsFileRateLimiter() {
-    final double throughputBytesPerSecondLimit = 
throughputBytesPerSecond.get();
-    loadWriteRateLimiter =
-        // if throughput <= 0, disable rate limiting
-        throughputBytesPerSecondLimit <= 0
-            ? RateLimiter.create(Double.MAX_VALUE)
-            : RateLimiter.create(throughputBytesPerSecondLimit);
-  }
-
   public void acquire(long bytes) {
-    if (throughputBytesPerSecond.get() != 
CONFIG.getLoadWriteThroughputBytesPerSecond()) {
-      final double newThroughputBytesPerSecond = 
CONFIG.getLoadWriteThroughputBytesPerSecond();
-      throughputBytesPerSecond.set(newThroughputBytesPerSecond);
+    final double throughputBytesPerSecondLimit = 
CONFIG.getLoadWriteThroughputBytesPerSecond();
+
+    if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) {
+      throughputBytesPerSecond.set(throughputBytesPerSecondLimit);
       loadWriteRateLimiter.setRate(
           // if throughput <= 0, disable rate limiting
-          newThroughputBytesPerSecond <= 0 ? Double.MAX_VALUE : 
newThroughputBytesPerSecond);
+          throughputBytesPerSecondLimit <= 0 ? Double.MAX_VALUE : 
throughputBytesPerSecondLimit);
+    }
+
+    // For performance, we don't need to acquire rate limiter if throughput <= 0
+    if (throughputBytesPerSecondLimit <= 0) {
+      return;
     }
 
     while (bytes > 0) {
@@ -64,6 +61,15 @@ public class LoadTsFileRateLimiter {
 
   //////////////////////////// Singleton ////////////////////////////
 
+  private LoadTsFileRateLimiter() {
+    final double throughputBytesPerSecondLimit = 
throughputBytesPerSecond.get();
+    loadWriteRateLimiter =
+        // if throughput <= 0, disable rate limiting
+        throughputBytesPerSecondLimit <= 0
+            ? RateLimiter.create(Double.MAX_VALUE)
+            : RateLimiter.create(throughputBytesPerSecondLimit);
+  }
+
   private static class LoadTsFileRateLimiterHolder {
 
     private static final LoadTsFileRateLimiter INSTANCE = new 
LoadTsFileRateLimiter();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index 4ea3048f2c3..843374ec49c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -46,16 +46,12 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 public class TreeModelPlanner implements IPlanner {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(TreeModelPlanner.class);
 
   private final Statement statement;
 
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 827eeca2ad7..099f76fb3b7 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -965,6 +965,11 @@ data_replication_factor=1
 # The port for the server to receive pipe data through air gap.
 # pipe_air_gap_receiver_port=9780
 
+# The total bytes that all pipe sinks can transfer per second.
+# When given a value less than or equal to 0, it means no limit.
+# default value is -1, which means no limit.
+# pipe_all_sinks_rate_limit_bytes_per_second=-1
+
 ####################
 ### RatisConsensus Configuration
 ####################
@@ -1104,3 +1109,8 @@ data_replication_factor=1
 # Load clean up task is used to clean up the unsuccessful loaded tsfile after 
a certain period of time.
 # The parameter is the delay time after an unsuccessful load operation (in 
seconds).
 # load_clean_up_task_execution_delay_time_seconds=1800
+
+# The maximum bytes per second of disk write throughput when loading tsfile.
+# When given a value less than or equal to 0, it means no limit.
+# Default value is -1, which means no limit.
+# load_write_throughput_bytes_per_second=-1
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 4572516fbee..91fa97fc82c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -165,6 +165,10 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
     return endpoint.getPort();
   }
 
+  public TEndPoint getEndPoint() {
+    return endpoint;
+  }
+
   @Override
   public String toString() {
     return String.format("AsyncPipeDataTransferServiceClient{%s}, id = {%d}", 
endpoint, id);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index fc437a3de97..c03cf539711 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -194,6 +194,8 @@ public class CommonConfig {
   private int pipeAsyncConnectorSelectorNumber = 4;
   private int pipeAsyncConnectorMaxClientNumber = 16;
 
+  private double pipeAllSinksRateLimitBytesPerSecond = -1;
+
   private boolean isSeperatedPipeHeartbeatEnabled = true;
   private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
   private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
@@ -1007,6 +1009,14 @@ public class CommonConfig {
     this.pipeRemainingTimeCommitRateSmoothingFactor = 
pipeRemainingTimeCommitRateSmoothingFactor;
   }
 
+  public double getPipeAllSinksRateLimitBytesPerSecond() {
+    return pipeAllSinksRateLimitBytesPerSecond;
+  }
+
+  public void setPipeAllSinksRateLimitBytesPerSecond(double 
pipeAllSinksRateLimitBytesPerSecond) {
+    this.pipeAllSinksRateLimitBytesPerSecond = 
pipeAllSinksRateLimitBytesPerSecond;
+  }
+
   public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
     return twoStageAggregateMaxCombinerLiveTimeInMs;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index efb3420b62c..b472b155aef 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -377,6 +377,12 @@ public class CommonDescriptor {
                         "pipe_async_connector_max_client_number",
                         
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber())))));
 
+    config.setPipeAllSinksRateLimitBytesPerSecond(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_all_sinks_rate_limit_bytes_per_second",
+                
String.valueOf(config.getPipeAllSinksRateLimitBytesPerSecond()))));
+
     config.setSeperatedPipeHeartbeatEnabled(
         Boolean.parseBoolean(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 41331a822dd..dfd48e7eccb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -123,6 +123,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
   }
 
+  public double getPipeAllConnectorsRateLimitBytesPerSecond() {
+    return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
+  }
+
   public float getPipeLeaderCacheMemoryUsagePercentage() {
     return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
   }
@@ -324,6 +328,10 @@ public class PipeConfig {
     LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
     LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
 
+    LOGGER.info(
+        "PipeAllConnectorsRateLimitBytesPerSecond: {}",
+        getPipeAllConnectorsRateLimitBytesPerSecond());
+
     LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", 
isSeperatedPipeHeartbeatEnabled());
     LOGGER.info(
         "PipeHeartbeatIntervalSecondsForCollectingPipeMeta: {}",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index c15fc4db189..5e73c4d54d8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -177,6 +177,10 @@ public class PipeConnectorConstant {
                   CONNECTOR_COMPRESSOR_ZSTD,
                   CONNECTOR_COMPRESSOR_LZMA2)));
 
+  public static final String CONNECTOR_RATE_LIMIT_KEY = 
"connector.rate-limit-bytes-per-second";
+  public static final String SINK_RATE_LIMIT_KEY = 
"sink.rate-limit-bytes-per-second";
+  public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1;
+
   public static final String SINK_TOPIC_KEY = "sink.topic";
   public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group";
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
index 3ab7a156b74..f15934afd5e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.pipe.connector.client;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
@@ -33,6 +34,7 @@ public class IoTDBSyncClient extends IClientRPCService.Client
 
   private final String ipAddress;
   private final int port;
+  private final TEndPoint endPoint;
 
   public IoTDBSyncClient(
       ThriftClientProperty property,
@@ -57,6 +59,7 @@ public class IoTDBSyncClient extends IClientRPCService.Client
                         ipAddress, port, property.getConnectionTimeoutMs())));
     this.ipAddress = ipAddress;
     this.port = port;
+    this.endPoint = new TEndPoint(ipAddress, port);
     final TTransport transport = getInputProtocol().getTransport();
     if (!transport.isOpen()) {
       transport.open();
@@ -71,6 +74,10 @@ public class IoTDBSyncClient extends IClientRPCService.Client
     return port;
   }
 
+  public TEndPoint getEndPoint() {
+    return endPoint;
+  }
+
   public void setTimeout(int timeout) {
     ((TimeoutChangeableTransport) 
(getInputProtocol().getTransport())).setTimeout(timeout);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
similarity index 51%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
index 8aa7841a8d0..575731d04a5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
@@ -17,63 +17,54 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.commons.pipe.connector.limiter;
 
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
 import com.google.common.util.concurrent.AtomicDouble;
 import com.google.common.util.concurrent.RateLimiter;
 
-public class LoadTsFileRateLimiter {
+/** This is a global rate limiter for all connectors. */
+public class GlobalRateLimiter {
 
-  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final PipeConfig CONFIG = PipeConfig.getInstance();
 
   private final AtomicDouble throughputBytesPerSecond =
-      new AtomicDouble(CONFIG.getLoadWriteThroughputBytesPerSecond());
-  private final RateLimiter loadWriteRateLimiter;
+      new AtomicDouble(CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond());
+  private final RateLimiter rateLimiter;
 
-  private LoadTsFileRateLimiter() {
+  public GlobalRateLimiter() {
     final double throughputBytesPerSecondLimit = 
throughputBytesPerSecond.get();
-    loadWriteRateLimiter =
-        // if throughput <= 0, disable rate limiting
+    rateLimiter =
         throughputBytesPerSecondLimit <= 0
             ? RateLimiter.create(Double.MAX_VALUE)
             : RateLimiter.create(throughputBytesPerSecondLimit);
   }
 
   public void acquire(long bytes) {
-    if (throughputBytesPerSecond.get() != 
CONFIG.getLoadWriteThroughputBytesPerSecond()) {
-      final double newThroughputBytesPerSecond = 
CONFIG.getLoadWriteThroughputBytesPerSecond();
-      throughputBytesPerSecond.set(newThroughputBytesPerSecond);
-      loadWriteRateLimiter.setRate(
+    final double throughputBytesPerSecondLimit =
+        CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond();
+
+    if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) {
+      throughputBytesPerSecond.set(throughputBytesPerSecondLimit);
+      rateLimiter.setRate(
           // if throughput <= 0, disable rate limiting
-          newThroughputBytesPerSecond <= 0 ? Double.MAX_VALUE : 
newThroughputBytesPerSecond);
+          throughputBytesPerSecondLimit <= 0 ? Double.MAX_VALUE : 
throughputBytesPerSecondLimit);
+    }
+
+    // For performance, we don't need to acquire rate limiter if throughput <= 0
+    if (throughputBytesPerSecondLimit <= 0) {
+      return;
     }
 
     while (bytes > 0) {
       if (bytes > Integer.MAX_VALUE) {
-        loadWriteRateLimiter.acquire(Integer.MAX_VALUE);
+        rateLimiter.acquire(Integer.MAX_VALUE);
         bytes -= Integer.MAX_VALUE;
       } else {
-        loadWriteRateLimiter.acquire((int) bytes);
+        rateLimiter.acquire((int) bytes);
         return;
       }
     }
   }
-
-  //////////////////////////// Singleton ////////////////////////////
-
-  private static class LoadTsFileRateLimiterHolder {
-
-    private static final LoadTsFileRateLimiter INSTANCE = new 
LoadTsFileRateLimiter();
-
-    private LoadTsFileRateLimiterHolder() {
-      // Prevent instantiation
-    }
-  }
-
-  public static LoadTsFileRateLimiter getInstance() {
-    return LoadTsFileRateLimiterHolder.INSTANCE;
-  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java
new file mode 100644
index 00000000000..8b151644576
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.limiter;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class PipeEndPointRateLimiter {
+
+  private final double bytesPerSecondLimit;
+
+  private final ConcurrentMap<TEndPoint, RateLimiter> endPointRateLimiterMap;
+
+  public PipeEndPointRateLimiter(double bytesPerSecondLimit) {
+    this.bytesPerSecondLimit = bytesPerSecondLimit;
+    endPointRateLimiterMap = new ConcurrentHashMap<>();
+  }
+
+  public void acquire(final TEndPoint endPoint, long bytes) {
+    if (endPoint == null) {
+      return;
+    }
+
+    final RateLimiter rateLimiter =
+        endPointRateLimiterMap.computeIfAbsent(
+            endPoint, e -> RateLimiter.create(bytesPerSecondLimit));
+
+    while (bytes > 0) {
+      if (bytes > Integer.MAX_VALUE) {
+        rateLimiter.acquire(Integer.MAX_VALUE);
+        bytes -= Integer.MAX_VALUE;
+      } else {
+        rateLimiter.acquire((int) bytes);
+        return;
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index ec2d22ddfa6..7aac1ffc90d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.pipe.connector.protocol;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapELanguageConstant;
@@ -57,11 +58,29 @@ import static 
org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
 
 public abstract class IoTDBAirGapConnector extends IoTDBConnector {
 
+  protected static class AirGapSocket extends Socket {
+
+    private final TEndPoint endPoint;
+
+    public AirGapSocket(String ip, int port) {
+      this.endPoint = new TEndPoint(ip, port);
+    }
+
+    public TEndPoint getEndPoint() {
+      return endPoint;
+    }
+
+    @Override
+    public String toString() {
+      return "AirGapSocket{" + "endPoint=" + endPoint + "} (" + 
super.toString() + ")";
+    }
+  }
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBAirGapConnector.class);
 
   protected static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
 
-  protected final List<Socket> sockets = new ArrayList<>();
+  protected final List<AirGapSocket> sockets = new ArrayList<>();
   protected final List<Boolean> isSocketAlive = new ArrayList<>();
 
   private LoadBalancer loadBalancer;
@@ -147,7 +166,7 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
         }
       }
 
-      final Socket socket = new Socket();
+      final AirGapSocket socket = new AirGapSocket(ip, port);
 
       try {
         socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs);
@@ -176,7 +195,7 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
         String.format("All target servers %s are not available.", nodeUrls));
   }
 
-  protected void sendHandshakeReq(Socket socket) throws IOException {
+  protected void sendHandshakeReq(AirGapSocket socket) throws IOException {
     socket.setSoTimeout(handshakeTimeoutMs);
     // Try to handshake by PipeTransferHandshakeV2Req. If failed, retry to 
handshake by
     // PipeTransferHandshakeV1Req. If failed again, throw 
PipeConnectionException.
@@ -208,7 +227,8 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
     }
   }
 
-  protected void transferFilePieces(File file, Socket socket, boolean 
isMultiFile)
+  protected void transferFilePieces(
+      String pipeName, File file, AirGapSocket socket, boolean isMultiFile)
       throws PipeException, IOException {
     final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
@@ -225,11 +245,11 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
                 ? readBuffer
                 : Arrays.copyOfRange(readBuffer, 0, readLength);
         if (!send(
+            pipeName,
             socket,
-            compressIfNeeded(
-                isMultiFile
-                    ? getTransferMultiFilePieceBytes(file.getName(), position, 
payload)
-                    : getTransferSingleFilePieceBytes(file.getName(), 
position, payload)))) {
+            isMultiFile
+                ? getTransferMultiFilePieceBytes(file.getName(), position, 
payload)
+                : getTransferSingleFilePieceBytes(file.getName(), position, 
payload))) {
           final String errorMessage =
               String.format("Transfer file %s error. Socket %s.", file, 
socket);
           if (mayNeedHandshakeWhenFail()) {
@@ -261,11 +281,15 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
     return loadBalancer.nextSocketIndex();
   }
 
-  protected boolean send(Socket socket, byte[] bytes) throws IOException {
+  protected boolean send(String pipeName, AirGapSocket socket, byte[] bytes) 
throws IOException {
     if (!socket.isConnected()) {
       return false;
     }
 
+    bytes = compressIfNeeded(bytes);
+
+    rateLimitIfNeeded(pipeName, socket.getEndPoint(), bytes.length);
+
     final BufferedOutputStream outputStream = new 
BufferedOutputStream(socket.getOutputStream());
     bytes = enrichWithLengthAndChecksum(bytes);
     outputStream.write(eLanguageEnable ? enrichWithELanguage(bytes) : bytes);
@@ -276,6 +300,10 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
     return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response);
   }
 
+  protected boolean send(AirGapSocket socket, byte[] bytes) throws IOException 
{
+    return send(null, socket, bytes);
+  }
+
   private byte[] enrichWithLengthAndChecksum(byte[] bytes) {
     // Length of checksum and bytes payload
     final byte[] length = BytesUtils.intToBytes(bytes.length + LONG_LEN);
@@ -309,6 +337,8 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
         isSocketAlive.set(i, false);
       }
     }
+
+    super.close();
   }
 
   /////////////////////// Strategies for load balance 
//////////////////////////
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 751f0dec05f..f65daf86d4f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
 import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
 import 
org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
+import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRateLimiter;
+import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -40,8 +42,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY;
@@ -65,6 +69,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
@@ -77,6 +83,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
 
 public abstract class IoTDBConnector implements PipeConnector {
 
@@ -91,8 +98,13 @@ public abstract class IoTDBConnector implements 
PipeConnector {
 
   protected String loadBalanceStrategy;
 
-  protected boolean isRpcCompressionEnabled;
-  protected final List<PipeCompressor> compressors = new ArrayList<>();
+  private boolean isRpcCompressionEnabled;
+  private final List<PipeCompressor> compressors = new ArrayList<>();
+
+  private static final Map<String, PipeEndPointRateLimiter> 
PIPE_END_POINT_RATE_LIMITER_MAP =
+      new ConcurrentHashMap<>();
+  private double endPointRateLimitBytesPerSecond = -1;
+  private static final GlobalRateLimiter GLOBAL_RATE_LIMITER = new 
GlobalRateLimiter();
 
   protected boolean isTabletBatchModeEnabled = true;
 
@@ -169,6 +181,17 @@ public abstract class IoTDBConnector implements 
PipeConnector {
         compressors.size());
     isRpcCompressionEnabled = !compressors.isEmpty();
 
+    endPointRateLimitBytesPerSecond =
+        parameters.getDoubleOrDefault(
+            Arrays.asList(CONNECTOR_RATE_LIMIT_KEY, SINK_RATE_LIMIT_KEY),
+            CONNECTOR_RATE_LIMIT_DEFAULT_VALUE);
+    validator.validate(
+        arg -> endPointRateLimitBytesPerSecond <= Double.MAX_VALUE,
+        String.format(
+            "Rate limit should be in the range (0, %f], but got %f.",
+            Double.MAX_VALUE, endPointRateLimitBytesPerSecond),
+        endPointRateLimitBytesPerSecond);
+
     validator.validate(
         arg -> arg.equals("retry") || arg.equals("ignore"),
         String.format(
@@ -302,6 +325,12 @@ public abstract class IoTDBConnector implements 
PipeConnector {
     }
   }
 
+  @Override
+  public void close() {
+    // TODO: Not all the limiters should be closed here, but it's fine for now.
+    PIPE_END_POINT_RATE_LIMITER_MAP.clear();
+  }
+
   protected TPipeTransferReq compressIfNeeded(TPipeTransferReq req) throws 
IOException {
     return isRpcCompressionEnabled
         ? PipeTransferCompressedReq.toTPipeTransferReq(req, compressors)
@@ -322,6 +351,18 @@ public abstract class IoTDBConnector implements 
PipeConnector {
     return compressors;
   }
 
+  public void rateLimitIfNeeded(
+      final String pipeName, final TEndPoint endPoint, final long bytesLength) 
{
+    if (pipeName != null && endPointRateLimitBytesPerSecond > 0) {
+      PIPE_END_POINT_RATE_LIMITER_MAP
+          .computeIfAbsent(
+              pipeName, endpoint -> new 
PipeEndPointRateLimiter(endPointRateLimitBytesPerSecond))
+          .acquire(endPoint, bytesLength);
+    }
+
+    GLOBAL_RATE_LIMITER.acquire(bytesLength);
+  }
+
   public PipeReceiverStatusHandler statusHandler() {
     return receiverStatusHandler;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 4e1ab4d4789..713f25df29e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.tsfile.utils.Pair;
@@ -143,7 +144,10 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
   }
 
   protected void transferFilePieces(
-      File file, Pair<IoTDBSyncClient, Boolean> clientAndStatus, boolean 
isMultiFile)
+      String pipeName,
+      File file,
+      Pair<IoTDBSyncClient, Boolean> clientAndStatus,
+      boolean isMultiFile)
       throws PipeException, IOException {
     final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
@@ -161,16 +165,16 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
                 : Arrays.copyOfRange(readBuffer, 0, readLength);
         final PipeTransferFilePieceResp resp;
         try {
+          final TPipeTransferReq req =
+              compressIfNeeded(
+                  isMultiFile
+                      ? getTransferMultiFilePieceReq(file.getName(), position, 
payLoad)
+                      : getTransferSingleFilePieceReq(file.getName(), 
position, payLoad));
+          rateLimitIfNeeded(
+              pipeName, clientAndStatus.getLeft().getEndPoint(), 
req.getBody().length);
           resp =
               PipeTransferFilePieceResp.fromTPipeTransferResp(
-                  clientAndStatus
-                      .getLeft()
-                      .pipeTransfer(
-                          compressIfNeeded(
-                              isMultiFile
-                                  ? 
getTransferMultiFilePieceReq(file.getName(), position, payLoad)
-                                  : getTransferSingleFilePieceReq(
-                                      file.getName(), position, payLoad))));
+                  clientAndStatus.getLeft().pipeTransfer(req));
         } catch (Exception e) {
           clientAndStatus.setRight(false);
           throw new PipeConnectionException(
@@ -219,5 +223,7 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
     if (clientManager != null) {
       clientManager.close();
     }
+
+    super.close();
   }
 }


Reply via email to