This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d1cffae9a1e0d83ef3835de831b4cc4c6f6e53b0
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 5 18:06:00 2025 +0800

    Revert Pipe: Added rate limiter for tsFile sending
    
    (cherry picked from commit 73a504814f549cd15648072ae744837c04baf913)
---
 .../sink/protocol/IoTDBConfigRegionAirGapSink.java |  5 --
 .../pipe/sink/protocol/IoTDBConfigRegionSink.java  |  5 --
 .../task/builder/PipeDataNodeTaskBuilder.java      | 66 ++++++++--------------
 .../pipe/metric/overview/PipeResourceMetrics.java  | 13 -----
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java | 31 ----------
 .../airgap/IoTDBSchemaRegionAirGapSink.java        |  5 --
 .../thrift/async/IoTDBDataRegionAsyncSink.java     | 14 -----
 .../async/handler/PipeTransferTsFileHandler.java   |  6 --
 .../thrift/sync/IoTDBDataRegionSyncSink.java       | 21 -------
 .../thrift/sync/IoTDBSchemaRegionSink.java         |  5 --
 .../source/dataregion/IoTDBDataRegionSource.java   | 10 +---
 .../load/limiter/LoadTsFileRateLimiter.java        | 64 ++++++++++++++++++---
 .../apache/iotdb/commons/conf/CommonConfig.java    | 16 ------
 .../iotdb/commons/pipe/config/PipeConfig.java      |  6 --
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  6 --
 .../pipe/config/constant/PipeSinkConstant.java     |  5 --
 .../pipe/sink/limiter/GlobalRPCRateLimiter.java    | 33 -----------
 .../pipe/sink/limiter/GlobalRateLimiter.java       | 14 ++---
 .../pipe/sink/limiter/TsFileSendRateLimiter.java   | 47 ---------------
 .../pipe/sink/protocol/IoTDBAirGapSink.java        |  3 -
 .../commons/pipe/sink/protocol/IoTDBSink.java      |  4 +-
 .../pipe/sink/protocol/IoTDBSslSyncSink.java       |  3 -
 .../iotdb/commons/service/metric/enums/Metric.java |  1 -
 23 files changed, 92 insertions(+), 291 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
index 5f32bae8c27..c9c8ded4cf0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
@@ -88,11 +88,6 @@ public class IoTDBConfigRegionAirGapSink extends 
IoTDBAirGapSink {
     return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
 
-  @Override
-  protected void mayLimitRateAndRecordIO(final long requiredBytes) {
-    // Do nothing
-  }
-
   @Override
   protected boolean mayNeedHandshakeWhenFail() {
     return true;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
index e846410b5e7..a1e9239ccb2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
@@ -103,11 +103,6 @@ public class IoTDBConfigRegionSink extends 
IoTDBSslSyncSink {
     return PipeTransferConfigSnapshotPieceReq.toTPipeTransferReq(fileName, 
position, payLoad);
   }
 
-  @Override
-  protected void mayLimitRateAndRecordIO(final long requiredBytes) {
-    // Do nothing
-  }
-
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     throw new UnsupportedOperationException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 2b6f18c167d..7f0b6b4ff18 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -56,11 +56,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
 
 public class PipeDataNodeTaskBuilder {
 
@@ -187,6 +184,10 @@ public class PipeDataNodeTaskBuilder {
                 || 
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
       }
 
+      if (!insertionDeletionListeningOptionPair.right
+          && !shouldTerminatePipeOnAllHistoricalEventsConsumed) {
+        return;
+      }
     } catch (final IllegalPathException e) {
       LOGGER.warn(
           "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' 
parameters: {}",
@@ -195,50 +196,29 @@ public class PipeDataNodeTaskBuilder {
       return;
     }
 
-    if (insertionDeletionListeningOptionPair.right
-        || shouldTerminatePipeOnAllHistoricalEventsConsumed) {
-      final Boolean isRealtime =
-          connectorParameters.getBooleanByKeys(
-              PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
-              PipeSinkConstant.SINK_REALTIME_FIRST_KEY);
-      if (isRealtime == null) {
-        
connectorParameters.addAttribute(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, 
"false");
-        if (insertionDeletionListeningOptionPair.right) {
-          LOGGER.info(
-              "PipeDataNodeTaskBuilder: When 'inclusion' contains 
'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues 
after deletion.");
-        } else {
-          LOGGER.info(
-              "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 
'realtime-first' is defaulted to 'false' to prevent premature halt before 
transfer completion.");
-        }
-      } else if (isRealtime) {
-        if (insertionDeletionListeningOptionPair.right) {
-          LOGGER.warn(
-              "PipeDataNodeTaskBuilder: When 'inclusion' includes 
'data.delete', 'realtime-first' set to 'true' may result in data 
synchronization issues after deletion.");
-        } else {
-          LOGGER.warn(
-              "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 
'realtime-first' set to 'true' may cause prevent premature halt before transfer 
completion.");
-        }
+    final Boolean isRealtime =
+        connectorParameters.getBooleanByKeys(
+            PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
+            PipeSinkConstant.SINK_REALTIME_FIRST_KEY);
+    if (isRealtime == null) {
+      
connectorParameters.addAttribute(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, 
"false");
+      if (insertionDeletionListeningOptionPair.right) {
+        LOGGER.info(
+            "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 
'realtime-first' is defaulted to 'false' to prevent sync issues after 
deletion.");
+      } else {
+        LOGGER.info(
+            "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 
'realtime-first' is defaulted to 'false' to prevent premature halt before 
transfer completion.");
       }
+      return;
     }
 
-    final boolean isRealtimeEnabled =
-        extractorParameters.getBooleanOrDefault(
-            Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
-            EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
-
-    if (isRealtimeEnabled && 
!shouldTerminatePipeOnAllHistoricalEventsConsumed) {
-      final Boolean enableSendTsFileLimit =
-          connectorParameters.getBooleanByKeys(
-              PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
-              PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);
-
-      if (enableSendTsFileLimit == null) {
-        
connectorParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
 "true");
-        LOGGER.info(
-            "PipeDataNodeTaskBuilder: When the realtime sync is enabled, we 
enable rate limiter in sending tsfile by default to reserve disk and network IO 
for realtime sending.");
-      } else if (!enableSendTsFileLimit) {
+    if (isRealtime) {
+      if (insertionDeletionListeningOptionPair.right) {
+        LOGGER.warn(
+            "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 
'realtime-first' set to 'true' may result in data synchronization issues after 
deletion.");
+      } else {
         LOGGER.warn(
-            "PipeDataNodeTaskBuilder: When the realtime sync is enabled, not 
enabling the rate limiter in sending tsfile may introduce delay for realtime 
sending.");
+            "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 
'realtime-first' set to 'true' may cause prevent premature halt before transfer 
completion.");
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
index 4156aea4bc5..54f9dfe4092 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
@@ -26,9 +26,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.metrics.AbstractMetricService;
-import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
-import org.apache.iotdb.metrics.type.Counter;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
 
@@ -43,7 +41,6 @@ public class PipeResourceMetrics implements IMetricSet {
 
   private static final String PIPE_TOTAL_MEMORY = "PipeTotalMemory";
 
-  private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
   private static final String PIPE_FLOATING_MEMORY = "PipeFloatingMemory";
 
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
@@ -99,10 +96,6 @@ public class PipeResourceMetrics implements IMetricSet {
         MetricLevel.IMPORTANT,
         PipeDataNodeResourceManager.ref(),
         PipePhantomReferenceManager::getPhantomReferenceCount);
-    // tsFile send rate
-    diskIOCounter =
-        metricService.getOrCreateCounter(
-            Metric.PIPE_TSFILE_SEND_DISK_IO.toString(), MetricLevel.IMPORTANT);
   }
 
   @Override
@@ -137,12 +130,6 @@ public class PipeResourceMetrics implements IMetricSet {
     metricService.remove(MetricType.AUTO_GAUGE, 
Metric.PIPE_LINKED_TSFILE_SIZE.toString());
     // phantom reference count
     metricService.remove(MetricType.AUTO_GAUGE, 
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
-
-    metricService.remove(MetricType.RATE, 
Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
-  }
-
-  public void recordDiskIO(final long bytes) {
-    diskIOCounter.inc(bytes);
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 5e2dfb6f9a0..8f776b21759 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -21,14 +21,12 @@ package org.apache.iotdb.db.pipe.sink.protocol.airgap;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
 import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
@@ -41,8 +39,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -55,33 +51,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Objects;
 
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT;
-
 @TreeModel
 @TableModel
 public class IoTDBDataRegionAirGapSink extends IoTDBDataNodeAirGapSink {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataRegionAirGapSink.class);
 
-  private boolean enableSendTsFileLimit;
-
-  @Override
-  public void customize(
-      final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
-      throws Exception {
-    super.customize(parameters, configuration);
-
-    enableSendTsFileLimit =
-        parameters.getBooleanOrDefault(
-            Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, 
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
-            CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE);
-  }
-
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     // PipeProcessor can change the type of TabletInsertionEvent
@@ -376,14 +353,6 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
     }
   }
 
-  @Override
-  protected void mayLimitRateAndRecordIO(final long requiredBytes) {
-    PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes);
-    if (enableSendTsFileLimit) {
-      TsFileSendRateLimiter.getInstance().acquire(requiredBytes);
-    }
-  }
-
   @Override
   protected byte[] getTransferSingleFilePieceBytes(
       final String fileName, final long position, final byte[] payLoad) throws 
IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
index 3634b2396a4..4cc5bb055d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
@@ -197,11 +197,6 @@ public class IoTDBSchemaRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
     }
   }
 
-  @Override
-  protected void mayLimitRateAndRecordIO(final long requiredBytes) {
-    // Do nothing
-  }
-
   @Override
   protected byte[] getTransferSingleFilePieceBytes(
       final String fileName, final long position, final byte[] payLoad) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 21fb0c9a730..1f35a96b84d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -81,11 +81,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
@@ -124,8 +121,6 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   private final Map<PipeTransferTrackableHandler, 
PipeTransferTrackableHandler> pendingHandlers =
       new ConcurrentHashMap<>();
 
-  private boolean enableSendTsFileLimit;
-
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
     super.validate(validator);
@@ -181,11 +176,6 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
     }
-
-    enableSendTsFileLimit =
-        parameters.getBooleanOrDefault(
-            Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, 
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
-            CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE);
   }
 
   @Override
@@ -711,10 +701,6 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     events.forEach(this::addFailureEventToRetryQueue);
   }
 
-  public boolean isEnableSendTsFileLimit() {
-    return enableSendTsFileLimit;
-  }
-
   //////////////////////////// Operations for close 
////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 9f8fbcf44b0..4ca34073d44 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -23,11 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
 import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock;
@@ -169,10 +167,6 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     client.setShouldReturnSelf(false);
     client.setTimeoutDynamically(clientManager.getConnectionTimeout());
 
-    PipeResourceMetrics.getInstance().recordDiskIO(readFileBufferSize);
-    if (connector.isEnableSendTsFileLimit()) {
-      TsFileSendRateLimiter.getInstance().acquire(readFileBufferSize);
-    }
     final int readLength = reader.read(readBuffer);
 
     if (readLength == -1) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index b3bded4a2bd..4b6ec2a843b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
-import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -32,7 +31,6 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
 import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
 import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeSyncClientManager;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch;
@@ -71,16 +69,11 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.NoSuchFileException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT;
-
 @TreeModel
 @TableModel
 public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink {
@@ -88,7 +81,6 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataRegionSyncSink.class);
 
   private PipeTransferBatchReqBuilder tabletBatchBuilder;
-  private boolean enableSendTsFileLimit;
 
   @Override
   public void customize(
@@ -100,11 +92,6 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
     }
-
-    enableSendTsFileLimit =
-        parameters.getBooleanOrDefault(
-            Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, 
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
-            CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE);
   }
 
   @Override
@@ -119,14 +106,6 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
     return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(fileName, 
position, payLoad);
   }
 
-  @Override
-  protected void mayLimitRateAndRecordIO(final long requiredBytes) {
-    PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes);
-    if (enableSendTsFileLimit) {
-      TsFileSendRateLimiter.getInstance().acquire(requiredBytes);
-    }
-  }
-
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     // PipeProcessor can change the type of TabletInsertionEvent
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
index e1e1e7868d7..ab18ac32dc6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
@@ -244,9 +244,4 @@ public class IoTDBSchemaRegionSink extends 
IoTDBDataNodeSyncSink {
       final String fileName, final long position, final byte[] payLoad) throws 
IOException {
     return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferReq(fileName, 
position, payLoad);
   }
-
-  @Override
-  protected void mayLimitRateAndRecordIO(final long requiredBytes) {
-    // Do nothing
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index 12bf5bc6db2..72dc0b51754 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -336,19 +336,15 @@ public class IoTDBDataRegionSource extends IoTDBSource {
             EXTRACTOR_END_TIME_KEY)
         && parameters.hasAnyAttributes(
             EXTRACTOR_HISTORY_ENABLE_KEY,
+            EXTRACTOR_REALTIME_ENABLE_KEY,
             SOURCE_HISTORY_ENABLE_KEY,
-            SOURCE_HISTORY_START_TIME_KEY,
-            EXTRACTOR_HISTORY_START_TIME_KEY,
-            SOURCE_HISTORY_END_TIME_KEY,
-            EXTRACTOR_HISTORY_END_TIME_KEY)) {
+            SOURCE_REALTIME_ENABLE_KEY)) {
       LOGGER.warn(
-          "When {}, {}, {} or {} is specified, specifying {}, {}, {}, {}, {} 
and {} is invalid.",
+          "When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is 
invalid.",
           SOURCE_START_TIME_KEY,
           EXTRACTOR_START_TIME_KEY,
           SOURCE_END_TIME_KEY,
           EXTRACTOR_END_TIME_KEY,
-          SOURCE_HISTORY_ENABLE_KEY,
-          EXTRACTOR_HISTORY_ENABLE_KEY,
           SOURCE_HISTORY_START_TIME_KEY,
           EXTRACTOR_HISTORY_START_TIME_KEY,
           SOURCE_HISTORY_END_TIME_KEY,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
index f616c6695e5..eaaf376a819 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
@@ -19,28 +19,78 @@
 
 package org.apache.iotdb.db.storageengine.load.limiter;
 
-import org.apache.iotdb.commons.pipe.sink.limiter.GlobalRateLimiter;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
 
-public class LoadTsFileRateLimiter extends GlobalRateLimiter {
+import com.google.common.util.concurrent.AtomicDouble;
+import com.google.common.util.concurrent.RateLimiter;
+
+import java.util.concurrent.TimeUnit;
+
+public class LoadTsFileRateLimiter {
 
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
-  @Override
+  private final AtomicDouble throughputBytesPerSecond =
+      new AtomicDouble(CONFIG.getLoadWriteThroughputBytesPerSecond());
+  private final RateLimiter loadWriteRateLimiter;
+
   public void acquire(long bytes) {
     LoadTsFileCostMetricsSet.getInstance().recordDiskIO(bytes);
-    super.acquire(bytes);
+
+    if (reloadParams()) {
+      return;
+    }
+
+    while (bytes > 0) {
+      if (bytes > Integer.MAX_VALUE) {
+        tryAcquireWithRateCheck(Integer.MAX_VALUE);
+        bytes -= Integer.MAX_VALUE;
+      } else {
+        tryAcquireWithRateCheck((int) bytes);
+        return;
+      }
+    }
   }
 
-  @Override
-  protected double getThroughputBytesPerSecond() {
-    return CONFIG.getLoadWriteThroughputBytesPerSecond();
+  private void tryAcquireWithRateCheck(final int bytes) {
+    while (!loadWriteRateLimiter.tryAcquire(
+        bytes,
+        PipeConfig.getInstance().getRateLimiterHotReloadCheckIntervalMs(),
+        TimeUnit.MILLISECONDS)) {
+      if (reloadParams()) {
+        return;
+      }
+    }
+  }
+
+  private boolean reloadParams() {
+    final double throughputBytesPerSecondLimit = 
CONFIG.getLoadWriteThroughputBytesPerSecond();
+
+    if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) {
+      throughputBytesPerSecond.set(throughputBytesPerSecondLimit);
+      loadWriteRateLimiter.setRate(
+          // if throughput <= 0, disable rate limiting
+          throughputBytesPerSecondLimit <= 0 ? Double.MAX_VALUE : 
throughputBytesPerSecondLimit);
+    }
+
+    // For performance, we don't need to acquire rate limiter if throughput <= 0
+    return throughputBytesPerSecondLimit <= 0;
   }
 
   //////////////////////////// Singleton ////////////////////////////
 
+  private LoadTsFileRateLimiter() {
+    final double throughputBytesPerSecondLimit = 
throughputBytesPerSecond.get();
+    loadWriteRateLimiter =
+        // if throughput <= 0, disable rate limiting
+        throughputBytesPerSecondLimit <= 0
+            ? RateLimiter.create(Double.MAX_VALUE)
+            : RateLimiter.create(throughputBytesPerSecondLimit);
+  }
+
   private static class LoadTsFileRateLimiterHolder {
 
     private static final LoadTsFileRateLimiter INSTANCE = new 
LoadTsFileRateLimiter();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 648cfeaad68..3031518edc3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -273,7 +273,6 @@ public class CommonConfig {
   private int pipeAsyncConnectorMaxTsFileClientNumber =
       Math.max(16, Runtime.getRuntime().availableProcessors());
 
-  private double pipeSendTsFileRateLimitBytesPerSecond = 32 * MB;
   private double pipeAllSinksRateLimitBytesPerSecond = -1;
   private int rateLimiterHotReloadCheckIntervalMs = 1000;
 
@@ -1867,21 +1866,6 @@ public class CommonConfig {
         pipeCheckSyncAllClientLiveTimeIntervalMs);
   }
 
-  public double getPipeSendTsFileRateLimitBytesPerSecond() {
-    return pipeSendTsFileRateLimitBytesPerSecond;
-  }
-
-  public void setPipeSendTsFileRateLimitBytesPerSecond(
-      double pipeSendTsFileRateLimitBytesPerSecond) {
-    if (this.pipeSendTsFileRateLimitBytesPerSecond == 
pipeSendTsFileRateLimitBytesPerSecond) {
-      return;
-    }
-    this.pipeSendTsFileRateLimitBytesPerSecond = 
pipeSendTsFileRateLimitBytesPerSecond;
-    logger.info(
-        "pipeSendTsFileRateLimitBytesPerSecond is set to {}",
-        pipeSendTsFileRateLimitBytesPerSecond);
-  }
-
   public double getPipeAllSinksRateLimitBytesPerSecond() {
     return pipeAllSinksRateLimitBytesPerSecond;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index a646ed90dbf..7d1064c7462 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -217,10 +217,6 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber();
   }
 
-  public double getPipeSendTsFileRateLimitBytesPerSecond() {
-    return COMMON_CONFIG.getPipeSendTsFileRateLimitBytesPerSecond();
-  }
-
   public double getPipeAllConnectorsRateLimitBytesPerSecond() {
     return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
   }
@@ -541,8 +537,6 @@ public class PipeConfig {
         "PipeAsyncConnectorMaxTsFileClientNumber: {}",
         getPipeAsyncConnectorMaxTsFileClientNumber());
 
-    LOGGER.info(
-        "PipeSendTsFileRateLimitBytesPerSecond: {}", 
getPipeSendTsFileRateLimitBytesPerSecond());
     LOGGER.info(
         "PipeAllConnectorsRateLimitBytesPerSecond: {}",
         getPipeAllConnectorsRateLimitBytesPerSecond());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 7bdfab83efa..895b940ebf8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -578,12 +578,6 @@ public class PipeDescriptor {
       
config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value));
     }
 
-    value =
-        parserPipeConfig(properties, 
"pipe_send_tsfile_rate_limit_bytes_per_second", isHotModify);
-    if (value != null) {
-      
config.setPipeSendTsFileRateLimitBytesPerSecond(Double.parseDouble(value));
-    }
-
     value = parserPipeConfig(properties, 
"pipe_all_sinks_rate_limit_bytes_per_second", isHotModify);
     if (value != null) {
       config.setPipeAllSinksRateLimitBytesPerSecond(Double.parseDouble(value));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index e09dd0ab5dd..42848eee8fc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -220,11 +220,6 @@ public class PipeSinkConstant {
   public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE = 
Zstd.minCompressionLevel();
   public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE = 
Zstd.maxCompressionLevel();
 
-  public static final String CONNECTOR_ENABLE_SEND_TSFILE_LIMIT =
-      "connector.enable-send-tsfile-limit";
-  public static final String SINK_ENABLE_SEND_TSFILE_LIMIT = 
"sink.enable-send-tsfile-limit";
-  public static final boolean CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE 
= false;
-
   public static final String CONNECTOR_RATE_LIMIT_KEY = 
"connector.rate-limit-bytes-per-second";
   public static final String SINK_RATE_LIMIT_KEY = 
"sink.rate-limit-bytes-per-second";
   public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
deleted file mode 100644
index 9a6aba5b90b..00000000000
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.commons.pipe.sink.limiter;
-
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-
-/** This is a global rate limiter for all connectors. */
-public class GlobalRPCRateLimiter extends GlobalRateLimiter {
-
-  private static final PipeConfig CONFIG = PipeConfig.getInstance();
-
-  @Override
-  protected double getThroughputBytesPerSecond() {
-    return CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond();
-  }
-}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRateLimiter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRateLimiter.java
index 358e6c844ec..10f465870ee 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRateLimiter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRateLimiter.java
@@ -26,17 +26,18 @@ import com.google.common.util.concurrent.RateLimiter;
 
 import java.util.concurrent.TimeUnit;
 
-public abstract class GlobalRateLimiter {
+/** This is a global rate limiter for all connectors. */
+public class GlobalRateLimiter {
 
-  private final AtomicDouble throughputBytesPerSecond =
-      new AtomicDouble(getThroughputBytesPerSecond());
+  private static final PipeConfig CONFIG = PipeConfig.getInstance();
 
+  private final AtomicDouble throughputBytesPerSecond =
+      new AtomicDouble(CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond());
   private final RateLimiter rateLimiter;
 
   public GlobalRateLimiter() {
     final double throughputBytesPerSecondLimit = 
throughputBytesPerSecond.get();
     rateLimiter =
-        // if throughput <= 0, disable rate limiting
         throughputBytesPerSecondLimit <= 0
             ? RateLimiter.create(Double.MAX_VALUE)
             : RateLimiter.create(throughputBytesPerSecondLimit);
@@ -70,7 +71,8 @@ public abstract class GlobalRateLimiter {
   }
 
   private boolean reloadParams() {
-    final double throughputBytesPerSecondLimit = getThroughputBytesPerSecond();
+    final double throughputBytesPerSecondLimit =
+        CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond();
 
     if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) {
       throughputBytesPerSecond.set(throughputBytesPerSecondLimit);
@@ -82,6 +84,4 @@ public abstract class GlobalRateLimiter {
     // For performance, we don't need to acquire rate limiter if throughput <= 0
     return throughputBytesPerSecondLimit <= 0;
   }
-
-  protected abstract double getThroughputBytesPerSecond();
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/TsFileSendRateLimiter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/TsFileSendRateLimiter.java
deleted file mode 100644
index e7f1cc415b7..00000000000
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/TsFileSendRateLimiter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.commons.pipe.sink.limiter;
-
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-
-public class TsFileSendRateLimiter extends GlobalRateLimiter {
-
-  private static final PipeConfig CONFIG = PipeConfig.getInstance();
-
-  @Override
-  protected double getThroughputBytesPerSecond() {
-    return CONFIG.getPipeSendTsFileRateLimitBytesPerSecond();
-  }
-
-  //////////////////////////// Singleton ////////////////////////////
-
-  private static class TsFileSendRateLimiterHolder {
-
-    private static final TsFileSendRateLimiter INSTANCE = new 
TsFileSendRateLimiter();
-
-    private TsFileSendRateLimiterHolder() {
-      // Prevent instantiation
-    }
-  }
-
-  public static TsFileSendRateLimiter getInstance() {
-    return TsFileSendRateLimiter.TsFileSendRateLimiterHolder.INSTANCE;
-  }
-}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index aba6c7e1d82..a9fe17ca0cc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -274,7 +274,6 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
       while (true) {
-        mayLimitRateAndRecordIO(readFileBufferSize);
         final int readLength = reader.read(readBuffer);
         if (readLength == -1) {
           break;
@@ -310,8 +309,6 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
     }
   }
 
-  protected abstract void mayLimitRateAndRecordIO(final long requiredBytes);
-
   protected abstract boolean mayNeedHandshakeWhenFail();
 
   protected abstract byte[] getTransferSingleFilePieceBytes(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index efee7c9c311..0babe802639 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -26,7 +26,7 @@ import 
org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
 import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressor;
 import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressorConfig;
 import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressorFactory;
-import org.apache.iotdb.commons.pipe.sink.limiter.GlobalRPCRateLimiter;
+import org.apache.iotdb.commons.pipe.sink.limiter.GlobalRateLimiter;
 import org.apache.iotdb.commons.pipe.sink.limiter.PipeEndPointRateLimiter;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
@@ -170,7 +170,7 @@ public abstract class IoTDBSink implements PipeConnector {
   private static final Map<Pair<String, Long>, PipeEndPointRateLimiter>
       PIPE_END_POINT_RATE_LIMITER_MAP = new ConcurrentHashMap<>();
   private double endPointRateLimitBytesPerSecond = -1;
-  private static final GlobalRPCRateLimiter GLOBAL_RATE_LIMITER = new 
GlobalRPCRateLimiter();
+  private static final GlobalRateLimiter GLOBAL_RATE_LIMITER = new 
GlobalRateLimiter();
 
   protected boolean isTabletBatchModeEnabled = true;
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
index 710a57f4f17..66d84ed7d2c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
@@ -186,7 +186,6 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink {
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
       while (true) {
-        mayLimitRateAndRecordIO(readFileBufferSize);
         final int readLength = reader.read(readBuffer);
         if (readLength == -1) {
           break;
@@ -256,8 +255,6 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink {
   protected abstract PipeTransferFilePieceReq getTransferMultiFilePieceReq(
       final String fileName, final long position, final byte[] payLoad) throws 
IOException;
 
-  protected abstract void mayLimitRateAndRecordIO(final long requiredBytes);
-
   @Override
   public void close() {
     if (clientManager != null) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index c226b53a3ee..0cca2c87a31 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -171,7 +171,6 @@ public enum Metric {
   PIPE_LINKED_TSFILE_COUNT("pipe_linked_tsfile_count"),
   PIPE_LINKED_TSFILE_SIZE("pipe_linked_tsfile_size"),
   PIPE_PHANTOM_REFERENCE_COUNT("pipe_phantom_reference_count"),
-  PIPE_TSFILE_SEND_DISK_IO("pipe_tsfile_send_disk_io"),
   
PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE("pipe_async_connector_retry_event_queue_size"),
   PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"),
   PIPE_PROCEDURE("pipe_procedure"),


Reply via email to