This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 89af73d2088 Pipe: Smoothed the rate in pipe's remaining time 
calculations (#12699)
89af73d2088 is described below

commit 89af73d2088fdde6adf7a48713cd1e7312cfe218
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 17 11:43:49 2024 +0800

    Pipe: Smoothed the rate in pipe's remaining time calculations (#12699)
---
 .../java/org/apache/iotdb/CountPointProcessor.java |  14 +-
 .../pipe/agent/task/PipeConfigNodeTaskAgent.java   |  10 ++
 .../pipe/event/PipeConfigRegionSnapshotEvent.java  |  13 +-
 .../pipe/event/PipeConfigRegionWritePlanEvent.java |   8 +-
 .../metric/PipeConfigNodeRemainingTimeMetrics.java |  34 +++--
 .../PipeConfigNodeRemainingTimeOperator.java       |  99 +++++++------
 .../manager/pipe/task/PipeConfigNodeTaskStage.java |  16 +--
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  11 ++
 .../db/pipe/event/UserDefinedEnrichedEvent.java    |  23 +--
 .../event/common/heartbeat/PipeHeartbeatEvent.java |  15 +-
 .../db/pipe/event/common/row/PipeRowCollector.java |   1 +
 .../schema/PipeSchemaRegionSnapshotEvent.java      |  14 +-
 .../schema/PipeSchemaRegionWritePlanEvent.java     |   8 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  66 +++++----
 .../common/tablet/PipeRawTabletInsertionEvent.java |  75 +++++-----
 .../event/common/terminate/PipeTerminateEvent.java |  10 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  52 +++----
 .../tsfile/TsFileInsertionDataContainer.java       |   2 +
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  |  11 +-
 .../PipeHistoricalDataRegionTsFileExtractor.java   |   6 +-
 .../realtime/PipeRealtimeDataRegionExtractor.java  |   7 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |   2 +-
 .../PipeDataNodeRemainingEventAndTimeMetrics.java  |  46 +++---
 .../PipeDataNodeRemainingEventAndTimeOperator.java | 158 ++++++++++++---------
 .../twostage/plugin/TwoStageCountProcessor.java    |   2 +-
 .../pipe/task/connection/PipeEventCollector.java   |   1 +
 iotdb-core/node-commons/pom.xml                    |   4 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |  25 +++-
 .../iotdb/commons/conf/CommonDescriptor.java       |  16 ++-
 .../enums/PipeRemainingTimeRateAverageTime.java    |  49 +++++++
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  12 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  15 +-
 .../iotdb/commons/pipe/event/EnrichedEvent.java    |  32 +++--
 .../commons/pipe/event/PipeSnapshotEvent.java      |   3 +-
 .../commons/pipe/event/PipeWritePlanEvent.java     |   3 +-
 .../commons/pipe/event/ProgressReportEvent.java    |   7 +-
 .../extractor/IoTDBNonDataRegionExtractor.java     |  11 +-
 .../commons/pipe/metric/PipeRemainingOperator.java |  87 ++++++++++++
 .../pipe/progress/PipeEventCommitManager.java      |  43 +++---
 39 files changed, 672 insertions(+), 339 deletions(-)

diff --git 
a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
 
b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
index 838090e7a3a..1e13b27b3a3 100644
--- 
a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
+++ 
b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
@@ -44,26 +44,28 @@ public class CountPointProcessor implements PipeProcessor {
   private PartialPath aggregateSeries;
 
   @Override
-  public void validate(PipeParameterValidator validator) {
+  public void validate(final PipeParameterValidator validator) {
     validator.validateRequiredAttribute(AGGREGATE_SERIES_KEY);
   }
 
   @Override
-  public void customize(PipeParameters parameters, 
PipeProcessorRuntimeConfiguration configuration)
+  public void customize(
+      final PipeParameters parameters, final PipeProcessorRuntimeConfiguration 
configuration)
       throws Exception {
     this.aggregateSeries = new 
PartialPath(parameters.getString(AGGREGATE_SERIES_KEY));
   }
 
   @Override
-  public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector) {
+  public void process(
+      final TabletInsertionEvent tabletInsertionEvent, final EventCollector 
eventCollector) {
     tabletInsertionEvent.processTablet(
         (tablet, rowCollector) -> writePointCount.addAndGet(tablet.rowSize));
   }
 
   @Override
-  public void process(Event event, EventCollector eventCollector) throws 
Exception {
+  public void process(final Event event, final EventCollector eventCollector) 
throws Exception {
     if (event instanceof PipeHeartbeatEvent) {
-      Tablet tablet =
+      final Tablet tablet =
           new Tablet(
               aggregateSeries.getDevice(),
               Collections.singletonList(
@@ -73,7 +75,7 @@ public class CountPointProcessor implements PipeProcessor {
       tablet.addTimestamp(0, System.currentTimeMillis());
       tablet.addValue(aggregateSeries.getMeasurement(), 0, 
writePointCount.get());
       eventCollector.collect(
-          new PipeRawTabletInsertionEvent(tablet, false, null, null, null, 
false));
+          new PipeRawTabletInsertionEvent(tablet, false, null, 0, null, null, 
false));
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index fb8507be9d4..abfea097dbf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -64,6 +64,16 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
     return PipeConfigNodeAgent.runtime().isShutdown();
   }
 
+  @Override
+  protected void thawRate(final String pipeName, final long creationTime) {
+    PipeConfigNodeRemainingTimeMetrics.getInstance().thawRate(pipeName + "_" + 
creationTime);
+  }
+
+  @Override
+  protected void freezeRate(final String pipeName, final long creationTime) {
+    PipeConfigNodeRemainingTimeMetrics.getInstance().freezeRate(pipeName + "_" 
+ creationTime);
+  }
+
   @Override
   protected Map<Integer, PipeTask> buildPipeTasks(final PipeMeta 
pipeMetaFromConfigNode)
       throws IllegalPathException {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index dcdc0f8f81d..56c9d012c3c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -90,7 +90,7 @@ public class PipeConfigRegionSnapshotEvent extends 
PipeSnapshotEvent {
 
   public PipeConfigRegionSnapshotEvent(
       final String snapshotPath, final String templateFilePath, final 
CNSnapshotFileType type) {
-    this(snapshotPath, templateFilePath, type, null, null, null);
+    this(snapshotPath, templateFilePath, type, null, 0, null, null);
   }
 
   public PipeConfigRegionSnapshotEvent(
@@ -98,9 +98,15 @@ public class PipeConfigRegionSnapshotEvent extends 
PipeSnapshotEvent {
       final String templateFilePath,
       final CNSnapshotFileType type,
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern) {
-    super(pipeName, pipeTaskMeta, pattern, 
PipeConfigNodeSnapshotResourceManager.getInstance());
+    super(
+        pipeName,
+        creationTime,
+        pipeTaskMeta,
+        pattern,
+        PipeConfigNodeSnapshotResourceManager.getInstance());
     this.snapshotPath = snapshotPath;
     this.templateFilePath = Objects.nonNull(templateFilePath) ? 
templateFilePath : "";
     this.fileType = type;
@@ -157,12 +163,13 @@ public class PipeConfigRegionSnapshotEvent extends 
PipeSnapshotEvent {
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final long startTime,
       final long endTime) {
     return new PipeConfigRegionSnapshotEvent(
-        snapshotPath, templateFilePath, fileType, pipeName, pipeTaskMeta, 
pattern);
+        snapshotPath, templateFilePath, fileType, pipeName, creationTime, 
pipeTaskMeta, pattern);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
index c6996907460..0d780bafca1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
@@ -41,16 +41,17 @@ public class PipeConfigRegionWritePlanEvent extends 
PipeWritePlanEvent {
 
   public PipeConfigRegionWritePlanEvent(
       final ConfigPhysicalPlan configPhysicalPlan, final boolean 
isGeneratedByPipe) {
-    this(configPhysicalPlan, null, null, null, isGeneratedByPipe);
+    this(configPhysicalPlan, null, 0, null, null, isGeneratedByPipe);
   }
 
   public PipeConfigRegionWritePlanEvent(
       final ConfigPhysicalPlan configPhysicalPlan,
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final boolean isGeneratedByPipe) {
-    super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
+    super(pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe);
     this.configPhysicalPlan = configPhysicalPlan;
   }
 
@@ -61,12 +62,13 @@ public class PipeConfigRegionWritePlanEvent extends 
PipeWritePlanEvent {
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final long startTime,
       final long endTime) {
     return new PipeConfigRegionWritePlanEvent(
-        configPhysicalPlan, pipeName, pipeTaskMeta, pattern, false);
+        configPhysicalPlan, pipeName, creationTime, pipeTaskMeta, pattern, 
false);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
index 378251eb45c..557e355596e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.confignode.manager.pipe.metric;
 
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -110,6 +109,26 @@ public class PipeConfigNodeRemainingTimeMetrics implements 
IMetricSet {
     }
   }
 
+  public void thawRate(final String pipeID) {
+    if (!remainingTimeOperatorMap.containsKey(pipeID)) {
+      LOGGER.warn(
+          "Failed to thaw pipe remaining time rate, RemainingTimeOperator({}) 
does not exist",
+          pipeID);
+      return;
+    }
+    remainingTimeOperatorMap.get(pipeID).thawRate(true);
+  }
+
+  public void freezeRate(final String pipeID) {
+    if (!remainingTimeOperatorMap.containsKey(pipeID)) {
+      LOGGER.warn(
+          "Failed to freeze pipe remaining time rate, 
RemainingTimeOperator({}) does not exist",
+          pipeID);
+      return;
+    }
+    remainingTimeOperatorMap.get(pipeID).freezeRate(true);
+  }
+
   public void deregister(final String pipeID) {
     if (!remainingTimeOperatorMap.containsKey(pipeID)) {
       LOGGER.warn(
@@ -122,12 +141,7 @@ public class PipeConfigNodeRemainingTimeMetrics implements 
IMetricSet {
     }
   }
 
-  public void markRegionCommit(final PipeTaskRuntimeEnvironment 
pipeTaskRuntimeEnvironment) {
-    // Filter commit attempt from assigner
-    final String pipeName = pipeTaskRuntimeEnvironment.getPipeName();
-    final long creationTime = pipeTaskRuntimeEnvironment.getCreationTime();
-    final String pipeID = pipeName + "_" + creationTime;
-
+  public void markRegionCommit(final String pipeID, final boolean 
isDataRegion) {
     if (Objects.isNull(metricService)) {
       return;
     }
@@ -137,12 +151,6 @@ public class PipeConfigNodeRemainingTimeMetrics implements 
IMetricSet {
           "Failed to mark pipe region commit, RemainingTimeOperator({}) does 
not exist", pipeID);
       return;
     }
-    // Prevent not set pipeName / creation times & potential differences 
between pipeNames and
-    // creation times
-    if (!Objects.equals(pipeName, operator.getPipeName())
-        || !Objects.equals(creationTime, operator.getCreationTime())) {
-      return;
-    }
 
     operator.markConfigRegionCommit();
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
index 201aa775de5..7a5044b0c61 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
@@ -19,77 +19,72 @@
 
 package org.apache.iotdb.confignode.manager.pipe.metric;
 
+import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
+import 
org.apache.iotdb.confignode.manager.pipe.execution.PipeConfigNodeSubtask;
 import 
org.apache.iotdb.confignode.manager.pipe.extractor.IoTDBConfigRegionExtractor;
 
 import com.codahale.metrics.Clock;
 import com.codahale.metrics.ExponentialMovingAverages;
 import com.codahale.metrics.Meter;
 
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
-class PipeConfigNodeRemainingTimeOperator {
+class PipeConfigNodeRemainingTimeOperator extends PipeRemainingOperator {
 
-  private static final long CONFIG_NODE_REMAINING_MAX_SECONDS = 365 * 24 * 60 
* 60L; // 1 year
+  private final Set<IoTDBConfigRegionExtractor> configRegionExtractors =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
+  private final AtomicReference<Meter> configRegionCommitMeter = new 
AtomicReference<>(null);
 
-  private String pipeName;
-  private long creationTime = 0;
-
-  private final ConcurrentMap<IoTDBConfigRegionExtractor, 
IoTDBConfigRegionExtractor>
-      configRegionExtractors = new ConcurrentHashMap<>();
-  private final Meter configRegionCommitMeter =
-      new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
-
-  private double lastConfigRegionCommitSmoothingValue = Long.MIN_VALUE;
-
-  //////////////////////////// Tags ////////////////////////////
-
-  String getPipeName() {
-    return pipeName;
-  }
-
-  long getCreationTime() {
-    return creationTime;
-  }
+  private double lastConfigRegionCommitSmoothingValue = Long.MAX_VALUE;
 
   //////////////////////////// Remaining time calculation 
////////////////////////////
 
   /**
-   * This will calculate the estimated remaining time of the given pipe's 
config region subTask.
+   * This will calculate the estimated remaining time of the given pipe's 
{@link
+   * PipeConfigNodeSubtask}.
    *
    * @return The estimated remaining time
    */
   double getRemainingTime() {
-    final double pipeRemainingTimeCommitRateSmoothingFactor =
-        
PipeConfig.getInstance().getPipeRemainingTimeCommitRateSmoothingFactor();
+    final PipeRemainingTimeRateAverageTime 
pipeRemainingTimeCommitRateAverageTime =
+        PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
 
     // Do not calculate heartbeat event
     final long totalConfigRegionWriteEventCount =
-        configRegionExtractors.keySet().stream()
+        configRegionExtractors.stream()
             .map(IoTDBConfigRegionExtractor::getUnTransferredEventCount)
             .reduce(Long::sum)
             .orElse(0L);
 
-    lastConfigRegionCommitSmoothingValue =
-        lastConfigRegionCommitSmoothingValue == Long.MIN_VALUE
-            ? configRegionCommitMeter.getOneMinuteRate()
-            : pipeRemainingTimeCommitRateSmoothingFactor
-                    * configRegionCommitMeter.getOneMinuteRate()
-                + (1 - pipeRemainingTimeCommitRateSmoothingFactor)
-                    * lastConfigRegionCommitSmoothingValue;
+    configRegionCommitMeter.updateAndGet(
+        meter -> {
+          if (Objects.nonNull(meter)) {
+            lastConfigRegionCommitSmoothingValue =
+                pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
+          }
+          return meter;
+        });
+
     final double configRegionRemainingTime;
     if (totalConfigRegionWriteEventCount <= 0) {
+      notifyEmpty();
       configRegionRemainingTime = 0;
     } else {
+      notifyNonEmpty();
       configRegionRemainingTime =
           lastConfigRegionCommitSmoothingValue <= 0
               ? Double.MAX_VALUE
               : totalConfigRegionWriteEventCount / 
lastConfigRegionCommitSmoothingValue;
     }
 
-    return configRegionRemainingTime >= CONFIG_NODE_REMAINING_MAX_SECONDS
-        ? CONFIG_NODE_REMAINING_MAX_SECONDS
+    return configRegionRemainingTime >= REMAINING_MAX_SECONDS
+        ? REMAINING_MAX_SECONDS
         : configRegionRemainingTime;
   }
 
@@ -97,17 +92,37 @@ class PipeConfigNodeRemainingTimeOperator {
 
   void register(final IoTDBConfigRegionExtractor extractor) {
     setNameAndCreationTime(extractor.getPipeName(), 
extractor.getCreationTime());
-    configRegionExtractors.put(extractor, extractor);
-  }
-
-  private void setNameAndCreationTime(final String pipeName, final long 
creationTime) {
-    this.pipeName = pipeName;
-    this.creationTime = creationTime;
+    configRegionExtractors.add(extractor);
   }
 
   //////////////////////////// Rate ////////////////////////////
 
   void markConfigRegionCommit() {
-    configRegionCommitMeter.mark();
+    configRegionCommitMeter.updateAndGet(
+        meter -> {
+          if (Objects.nonNull(meter)) {
+            meter.mark();
+          }
+          return meter;
+        });
+  }
+
+  //////////////////////////// Switch ////////////////////////////
+
+  @Override
+  public synchronized void thawRate(final boolean isStartPipe) {
+    super.thawRate(isStartPipe);
+    // The stopped pipe's rate should only be thawed by "startPipe" command
+    if (isStopped) {
+      return;
+    }
+    configRegionCommitMeter.compareAndSet(
+        null, new Meter(new ExponentialMovingAverages(), 
Clock.defaultClock()));
+  }
+
+  @Override
+  public synchronized void freezeRate(final boolean isStopPipe) {
+    super.freezeRate(isStopPipe);
+    configRegionCommitMeter.set(null);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskStage.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskStage.java
index 36eec65b69e..7e676903857 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskStage.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskStage.java
@@ -32,12 +32,12 @@ public class PipeConfigNodeTaskStage extends PipeTaskStage {
   private final PipeConfigNodeSubtask subtask;
 
   public PipeConfigNodeTaskStage(
-      String pipeName,
-      long creationTime,
-      Map<String, String> extractorAttributes,
-      Map<String, String> processorAttributes,
-      Map<String, String> connectorAttributes,
-      PipeTaskMeta pipeTaskMeta) {
+      final String pipeName,
+      final long creationTime,
+      final Map<String, String> extractorAttributes,
+      final Map<String, String> processorAttributes,
+      final Map<String, String> connectorAttributes,
+      final PipeTaskMeta pipeTaskMeta) {
 
     try {
       subtask =
@@ -48,7 +48,7 @@ public class PipeConfigNodeTaskStage extends PipeTaskStage {
               processorAttributes,
               connectorAttributes,
               pipeTaskMeta);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new PipeException(
           String.format(
               "Failed to create subtask for pipe %s, creation time %d", 
pipeName, creationTime),
@@ -63,8 +63,6 @@ public class PipeConfigNodeTaskStage extends PipeTaskStage {
 
   @Override
   public void startSubtask() throws PipeException {
-    // IoTDBConfigRegionExtractor is started by executor because starting
-    // here may cause deadlock when triggering snapshot
     PipeConfigNodeSubtaskExecutor.getInstance().start(subtask.getTaskID());
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index a6abcbb9e9a..1c3862bb991 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -237,6 +237,17 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
             });
   }
 
+  @Override
+  protected void thawRate(final String pipeName, final long creationTime) {
+    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(pipeName + 
"_" + creationTime);
+  }
+
+  @Override
+  protected void freezeRate(final String pipeName, final long creationTime) {
+    PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+        .freezeRate(pipeName + "_" + creationTime);
+  }
+
   @Override
   protected boolean dropPipe(final String pipeName, final long creationTime) {
     if (!super.dropPipe(pipeName, creationTime)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
index ff6e34fffcf..4803158db2e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
@@ -31,7 +31,7 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
   private final UserDefinedEvent userDefinedEvent;
   private final EnrichedEvent enrichedEvent;
 
-  public static Event maybeOf(Event event) {
+  public static Event maybeOf(final Event event) {
     return event instanceof UserDefinedEvent
             && ((UserDefinedEvent) event).getSourceEvent() instanceof 
EnrichedEvent
         ? new UserDefinedEnrichedEvent(
@@ -39,9 +39,11 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
         : event;
   }
 
-  private UserDefinedEnrichedEvent(UserDefinedEvent userDefinedEvent, 
EnrichedEvent enrichedEvent) {
+  private UserDefinedEnrichedEvent(
+      final UserDefinedEvent userDefinedEvent, final EnrichedEvent 
enrichedEvent) {
     super(
         enrichedEvent.getPipeName(),
+        enrichedEvent.getCreationTime(),
         enrichedEvent.getPipeTaskMeta(),
         enrichedEvent.getPipePattern(),
         enrichedEvent.getStartTime(),
@@ -55,12 +57,12 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent 
{
   }
 
   @Override
-  public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
+  public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     return 
enrichedEvent.internallyIncreaseResourceReferenceCount(holderMessage);
   }
 
   @Override
-  public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
+  public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     return 
enrichedEvent.internallyDecreaseResourceReferenceCount(holderMessage);
   }
 
@@ -71,13 +73,14 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent 
{
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime) {
+      final String pipeName,
+      final long creationTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
     return enrichedEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-        pipeName, pipeTaskMeta, pattern, startTime, endTime);
+        pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 9738f86a5dd..f82d799b7d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -38,7 +38,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeHeartbeatEvent.class);
 
   private final String dataRegionId;
-  private String pipeName;
 
   private long timePublished;
   private long timeAssigned;
@@ -60,18 +59,19 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   private final boolean shouldPrintMessage;
 
   public PipeHeartbeatEvent(final String dataRegionId, final boolean 
shouldPrintMessage) {
-    super(null, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
+    super(null, 0, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
     this.shouldPrintMessage = shouldPrintMessage;
   }
 
   public PipeHeartbeatEvent(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final String dataRegionId,
       final long timePublished,
       final boolean shouldPrintMessage) {
-    super(pipeName, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
+    super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
     this.timePublished = timePublished;
     this.shouldPrintMessage = shouldPrintMessage;
@@ -100,6 +100,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final long startTime,
@@ -107,7 +108,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
     // Should record PipeTaskMeta, for sometimes HeartbeatEvents should report 
exceptions.
     // Here we ignore parameters `pattern`, `startTime`, and `endTime`.
     return new PipeHeartbeatEvent(
-        pipeName, pipeTaskMeta, dataRegionId, timePublished, 
shouldPrintMessage);
+        pipeName, creationTime, pipeTaskMeta, dataRegionId, timePublished, 
shouldPrintMessage);
   }
 
   @Override
@@ -128,12 +129,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   /////////////////////////////// Delay Reporting 
///////////////////////////////
 
-  public void bindPipeName(final String pipeName) {
-    if (shouldPrintMessage) {
-      this.pipeName = pipeName;
-    }
-  }
-
   public void onPublished() {
     if (shouldPrintMessage) {
       timePublished = System.currentTimeMillis();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index 1b747652e1c..cb2183b2000 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -105,6 +105,7 @@ public class PipeRowCollector implements RowCollector {
               tablet,
               isAligned,
               sourceEvent == null ? null : sourceEvent.getPipeName(),
+              sourceEvent == null ? 0 : sourceEvent.getCreationTime(),
               pipeTaskMeta,
               sourceEvent,
               false));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index d647285bdc6..60bbf95feee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -78,7 +78,7 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
 
   public PipeSchemaRegionSnapshotEvent(
       final String mTreeSnapshotPath, final String tagLogSnapshotPath, final 
String databaseName) {
-    this(mTreeSnapshotPath, tagLogSnapshotPath, databaseName, null, null, 
null);
+    this(mTreeSnapshotPath, tagLogSnapshotPath, databaseName, null, 0, null, 
null);
   }
 
   public PipeSchemaRegionSnapshotEvent(
@@ -86,9 +86,10 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
       final String tagLogSnapshotPath,
       final String databaseName,
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern) {
-    super(pipeName, pipeTaskMeta, pattern, PipeResourceManager.snapshot());
+    super(pipeName, creationTime, pipeTaskMeta, pattern, 
PipeResourceManager.snapshot());
     this.mTreeSnapshotPath = mTreeSnapshotPath;
     this.tagLogSnapshotPath = Objects.nonNull(tagLogSnapshotPath) ? 
tagLogSnapshotPath : "";
     this.databaseName = databaseName;
@@ -145,12 +146,19 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final long startTime,
       final long endTime) {
     return new PipeSchemaRegionSnapshotEvent(
-        mTreeSnapshotPath, tagLogSnapshotPath, databaseName, pipeName, 
pipeTaskMeta, pattern);
+        mTreeSnapshotPath,
+        tagLogSnapshotPath,
+        databaseName,
+        pipeName,
+        creationTime,
+        pipeTaskMeta,
+        pattern);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
index 6e0b5cccf86..d6eefe086e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
@@ -40,16 +40,17 @@ public class PipeSchemaRegionWritePlanEvent extends 
PipeWritePlanEvent {
   }
 
   public PipeSchemaRegionWritePlanEvent(final PlanNode planNode, final boolean 
isGeneratedByPipe) {
-    this(planNode, null, null, null, isGeneratedByPipe);
+    this(planNode, null, 0, null, null, isGeneratedByPipe);
   }
 
   public PipeSchemaRegionWritePlanEvent(
       final PlanNode planNode,
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final boolean isGeneratedByPipe) {
-    super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
+    super(pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe);
     this.planNode = planNode;
   }
 
@@ -60,12 +61,13 @@ public class PipeSchemaRegionWritePlanEvent extends 
PipeWritePlanEvent {
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final long startTime,
       final long endTime) {
     return new PipeSchemaRegionWritePlanEvent(
-        planNode, pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
+        planNode, pipeName, creationTime, pipeTaskMeta, pattern, 
isGeneratedByPipe);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 11711fa1d03..b9ddfd0d55d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -68,11 +68,11 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   private ProgressIndex progressIndex;
 
   public PipeInsertNodeTabletInsertionEvent(
-      WALEntryHandler walEntryHandler,
-      PartialPath devicePath,
-      ProgressIndex progressIndex,
-      boolean isAligned,
-      boolean isGeneratedByPipe) {
+      final WALEntryHandler walEntryHandler,
+      final PartialPath devicePath,
+      final ProgressIndex progressIndex,
+      final boolean isAligned,
+      final boolean isGeneratedByPipe) {
     this(
         walEntryHandler,
         devicePath,
@@ -80,6 +80,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
         isAligned,
         isGeneratedByPipe,
         null,
+        0,
         null,
         null,
         Long.MIN_VALUE,
@@ -87,17 +88,18 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   }
 
   private PipeInsertNodeTabletInsertionEvent(
-      WALEntryHandler walEntryHandler,
-      PartialPath devicePath,
-      ProgressIndex progressIndex,
-      boolean isAligned,
-      boolean isGeneratedByPipe,
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime) {
-    super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
+      final WALEntryHandler walEntryHandler,
+      final PartialPath devicePath,
+      final ProgressIndex progressIndex,
+      final boolean isAligned,
+      final boolean isGeneratedByPipe,
+      final String pipeName,
+      final long creationTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
+    super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
     this.walEntryHandler = walEntryHandler;
     // Record device path here so there's no need to get it from InsertNode 
cache later.
     this.devicePath = devicePath;
@@ -129,11 +131,11 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
-  public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
+  public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     try {
       PipeResourceManager.wal().pin(walEntryHandler);
       return true;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           String.format(
               "Increase reference count for memtable %d error. Holder Message: 
%s",
@@ -144,7 +146,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   }
 
   @Override
-  public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
+  public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     try {
       PipeResourceManager.wal().unpin(walEntryHandler);
       // Release the containers' memory.
@@ -153,7 +155,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
         dataContainers = null;
       }
       return true;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           String.format(
               "Decrease reference count for memtable %d error. Holder Message: 
%s",
@@ -164,7 +166,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   }
 
   @Override
-  public void bindProgressIndex(ProgressIndex progressIndex) {
+  public void bindProgressIndex(final ProgressIndex progressIndex) {
     this.progressIndex = progressIndex;
   }
 
@@ -175,11 +177,12 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   @Override
   public PipeInsertNodeTabletInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime) {
+      final String pipeName,
+      final long creationTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
     return new PipeInsertNodeTabletInsertionEvent(
         walEntryHandler,
         devicePath,
@@ -187,6 +190,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
         isAligned,
         isGeneratedByPipe,
         pipeName,
+        creationTime,
         pipeTaskMeta,
         pattern,
         startTime,
@@ -224,7 +228,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
         throw new UnSupportedDataTypeException(
             String.format("InsertNode type %s is not supported.", 
insertNode.getClass().getName()));
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           "Exception occurred when determining the event time of 
PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}]. 
Returning true to ensure data integrity.",
           this,
@@ -238,7 +242,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   /////////////////////////// TabletInsertionEvent ///////////////////////////
 
   @Override
-  public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
+  public Iterable<TabletInsertionEvent> processRowByRow(
+      final BiConsumer<Row, RowCollector> consumer) {
     return initDataContainers().stream()
         .map(tabletInsertionDataContainer -> 
tabletInsertionDataContainer.processRowByRow(consumer))
         .flatMap(Collection::stream)
@@ -246,7 +251,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   }
 
   @Override
-  public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
+  public Iterable<TabletInsertionEvent> processTablet(
+      final BiConsumer<Tablet, RowCollector> consumer) {
     return initDataContainers().stream()
         .map(tabletInsertionDataContainer -> 
tabletInsertionDataContainer.processTablet(consumer))
         .flatMap(Collection::stream)
@@ -334,7 +340,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
             .map(
                 tablet ->
                     new PipeRawTabletInsertionEvent(
-                        tablet, isAligned, pipeName, pipeTaskMeta, this, 
false))
+                        tablet, isAligned, pipeName, creationTime, 
pipeTaskMeta, this, false))
             .filter(event -> !event.hasNoNeedParsingAndIsEmpty())
             .collect(Collectors.toList());
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index e792f580981..239f73b3c32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -52,16 +52,17 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   private ProgressIndex overridingProgressIndex;
 
   private PipeRawTabletInsertionEvent(
-      Tablet tablet,
-      boolean isAligned,
-      EnrichedEvent sourceEvent,
-      boolean needToReport,
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime) {
-    super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
+      final Tablet tablet,
+      final boolean isAligned,
+      final EnrichedEvent sourceEvent,
+      final boolean needToReport,
+      final String pipeName,
+      final long creationTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
+    super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
     this.tablet = Objects.requireNonNull(tablet);
     this.isAligned = isAligned;
     this.sourceEvent = sourceEvent;
@@ -69,18 +70,20 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   }
 
   public PipeRawTabletInsertionEvent(
-      Tablet tablet,
-      boolean isAligned,
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      EnrichedEvent sourceEvent,
-      boolean needToReport) {
+      final Tablet tablet,
+      final boolean isAligned,
+      final String pipeName,
+      final long creationTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final EnrichedEvent sourceEvent,
+      final boolean needToReport) {
     this(
         tablet,
         isAligned,
         sourceEvent,
         needToReport,
         pipeName,
+        creationTime,
         pipeTaskMeta,
         null,
         Long.MIN_VALUE,
@@ -88,28 +91,30 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   }
 
   @TestOnly
-  public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned) {
-    this(tablet, isAligned, null, false, null, null, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
+  public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean 
isAligned) {
+    this(tablet, isAligned, null, false, null, 0, null, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
   }
 
   @TestOnly
-  public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned, 
PipePattern pattern) {
-    this(tablet, isAligned, null, false, null, null, pattern, Long.MIN_VALUE, 
Long.MAX_VALUE);
+  public PipeRawTabletInsertionEvent(
+      final Tablet tablet, final boolean isAligned, final PipePattern pattern) 
{
+    this(tablet, isAligned, null, false, null, 0, null, pattern, 
Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
   @TestOnly
-  public PipeRawTabletInsertionEvent(Tablet tablet, long startTime, long 
endTime) {
-    this(tablet, false, null, false, null, null, null, startTime, endTime);
+  public PipeRawTabletInsertionEvent(
+      final Tablet tablet, final long startTime, final long endTime) {
+    this(tablet, false, null, false, null, 0, null, null, startTime, endTime);
   }
 
   @Override
-  public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
+  public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     allocatedMemoryBlock = 
PipeResourceManager.memory().forceAllocateWithRetry(tablet);
     return true;
   }
 
   @Override
-  public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
+  public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     allocatedMemoryBlock.close();
 
     // Record the deviceId before the memory is released,
@@ -130,7 +135,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   }
 
   @Override
-  public void bindProgressIndex(ProgressIndex overridingProgressIndex) {
+  public void bindProgressIndex(final ProgressIndex overridingProgressIndex) {
     // Normally not all events need to report progress, but if the 
overridingProgressIndex
     // is given, indicating that the progress needs to be reported.
     if (Objects.nonNull(overridingProgressIndex)) {
@@ -152,17 +157,19 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime) {
+      final String pipeName,
+      final long creationTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
     return new PipeRawTabletInsertionEvent(
         tablet,
         isAligned,
         sourceEvent,
         needToReport,
         pipeName,
+        creationTime,
         pipeTaskMeta,
         pattern,
         startTime,
@@ -196,7 +203,8 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   /////////////////////////// TabletInsertionEvent ///////////////////////////
 
   @Override
-  public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
+  public Iterable<TabletInsertionEvent> processRowByRow(
+      final BiConsumer<Row, RowCollector> consumer) {
     if (dataContainer == null) {
       dataContainer =
           new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, 
isAligned, pipePattern);
@@ -205,7 +213,8 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   }
 
   @Override
-  public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
+  public Iterable<TabletInsertionEvent> processTablet(
+      final BiConsumer<Tablet, RowCollector> consumer) {
     if (dataContainer == null) {
       dataContainer =
           new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, 
isAligned, pipePattern);
@@ -241,7 +250,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
 
   public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
     return new PipeRawTabletInsertionEvent(
-        convertToTablet(), isAligned, pipeName, pipeTaskMeta, this, 
needToReport);
+        convertToTablet(), isAligned, pipeName, creationTime, pipeTaskMeta, 
this, needToReport);
   }
 
   public boolean hasNoNeedParsingAndIsEmpty() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index c1252fe668f..cc423f1bf3d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -38,8 +38,11 @@ public class PipeTerminateEvent extends EnrichedEvent {
   private final int dataRegionId;
 
   public PipeTerminateEvent(
-      final String pipeName, final PipeTaskMeta pipeTaskMeta, final int 
dataRegionId) {
-    super(pipeName, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
+      final String pipeName,
+      final long creationTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final int dataRegionId) {
+    super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
   }
 
@@ -61,13 +64,14 @@ public class PipeTerminateEvent extends EnrichedEvent {
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final long startTime,
       final long endTime) {
     // Should record PipeTaskMeta, for the terminateEvent shall report 
progress to
     // notify the pipeTask it's completed.
-    return new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId);
+    return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, 
dataRegionId);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 55739bf8f32..a7c0cc5c255 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -59,7 +59,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   private TsFileInsertionDataContainer dataContainer;
 
   public PipeTsFileInsertionEvent(
-      TsFileResource resource, boolean isLoaded, boolean isGeneratedByPipe) {
+      final TsFileResource resource, final boolean isLoaded, final boolean 
isGeneratedByPipe) {
     // The modFile must be copied before the event is assigned to the 
listening pipes
     this(
         resource,
@@ -67,6 +67,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
         isLoaded,
         isGeneratedByPipe,
         null,
+        0,
         null,
         null,
         Long.MIN_VALUE,
@@ -74,16 +75,17 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   }
 
   public PipeTsFileInsertionEvent(
-      TsFileResource resource,
-      boolean isWithMod,
-      boolean isLoaded,
-      boolean isGeneratedByPipe,
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime) {
-    super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
+      final TsFileResource resource,
+      final boolean isWithMod,
+      final boolean isLoaded,
+      final boolean isGeneratedByPipe,
+      final String pipeName,
+      final long creationTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
+    super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
 
     this.resource = resource;
     tsFile = resource.getTsFile();
@@ -186,14 +188,14 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
-  public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
+  public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     try {
       tsFile = PipeResourceManager.tsfile().increaseFileReference(tsFile, 
true, resource);
       if (isWithMod) {
         modFile = PipeResourceManager.tsfile().increaseFileReference(modFile, 
false, null);
       }
       return true;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           String.format(
               "Increase reference count for TsFile %s or modFile %s error. 
Holder Message: %s",
@@ -204,14 +206,14 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
   }
 
   @Override
-  public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
+  public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     try {
       PipeResourceManager.tsfile().decreaseFileReference(tsFile);
       if (isWithMod) {
         PipeResourceManager.tsfile().decreaseFileReference(modFile);
       }
       return true;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           String.format(
               "Decrease reference count for TsFile %s error. Holder Message: 
%s",
@@ -231,7 +233,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
         return MinimumProgressIndex.INSTANCE;
       }
       return resource.getMaxProgressIndexAfterClose();
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       LOGGER.warn(
           String.format(
               "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath()));
@@ -242,17 +244,19 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
 
   @Override
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime) {
+      final String pipeName,
+      final long creationTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
     return new PipeTsFileInsertionEvent(
         resource,
         isWithMod,
         isLoaded,
         isGeneratedByPipe,
         pipeName,
+        creationTime,
         pipeTaskMeta,
         pattern,
         startTime,
@@ -305,7 +309,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
         return Collections.emptyList();
       }
       return initDataContainer().toTabletInsertionEvents();
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       Thread.currentThread().interrupt();
       close();
 
@@ -325,7 +329,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
                 tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this);
       }
       return dataContainer;
-    } catch (IOException e) {
+    } catch (final IOException e) {
       close();
 
       final String errorMsg = String.format("Read TsFile %s error.", 
resource.getTsFilePath());
@@ -334,7 +338,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     }
   }
 
-  public long count(boolean skipReportOnCommit) throws IOException {
+  public long count(final boolean skipReportOnCommit) throws IOException {
     long count = 0;
 
     if (shouldParseTime()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 36dfc3df458..03e61cf2809 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -252,6 +252,7 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
                       tablet,
                       isAligned,
                       sourceEvent != null ? sourceEvent.getPipeName() : null,
+                      sourceEvent != null ? sourceEvent.getCreationTime() : 0,
                       pipeTaskMeta,
                       sourceEvent,
                       true);
@@ -262,6 +263,7 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
                       tablet,
                       isAligned,
                       sourceEvent != null ? sourceEvent.getPipeName() : null,
+                      sourceEvent != null ? sourceEvent.getCreationTime() : 0,
                       pipeTaskMeta,
                       sourceEvent,
                       false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 77df1f0292b..ce8e7d9f0fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -58,7 +58,13 @@ public class PipeRealtimeEvent extends EnrichedEvent {
     // PipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeEvent
     // is only used in the realtime event extractor, which does not need to 
report the progress
     // of the event, so the pipeTaskMeta is always null.
-    super(event != null ? event.getPipeName() : null, pipeTaskMeta, pattern, 
startTime, endTime);
+    super(
+        event != null ? event.getPipeName() : null,
+        event != null ? event.getCreationTime() : 0,
+        pipeTaskMeta,
+        pattern,
+        startTime,
+        endTime);
 
     this.event = event;
     this.tsFileEpoch = tsFileEpoch;
@@ -140,13 +146,14 @@ public class PipeRealtimeEvent extends EnrichedEvent {
   @Override
   public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final long startTime,
       final long endTime) {
     return new PipeRealtimeEvent(
         event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-            pipeName, pipeTaskMeta, pattern, startTime, endTime),
+            pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime),
         this.tsFileEpoch,
         this.device2Measurements,
         pipeTaskMeta,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index b1e2649e6db..237dace7c02 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -94,6 +94,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
 
   private String pipeName;
+  private long creationTime;
+
   private PipeTaskMeta pipeTaskMeta;
   private ProgressIndex startIndex;
 
@@ -250,6 +252,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
         (PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
 
     pipeName = environment.getPipeName();
+    creationTime = environment.getCreationTime();
     pipeTaskMeta = environment.getPipeTaskMeta();
     startIndex = environment.getPipeTaskMeta().getProgressIndex();
 
@@ -563,7 +566,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     if (resource == null) {
       isTerminateSignalSent = true;
       final PipeTerminateEvent terminateEvent =
-          new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId);
+          new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, 
dataRegionId);
       terminateEvent.increaseReferenceCount(
           PipeHistoricalDataRegionTsFileExtractor.class.getName());
       return terminateEvent;
@@ -576,6 +579,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             false,
             false,
             pipeName,
+            creationTime,
             pipeTaskMeta,
             pipePattern,
             historicalDataExtractionStartTime,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index c4bec4246e4..c26db376da0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -74,6 +74,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
       LoggerFactory.getLogger(PipeRealtimeDataRegionExtractor.class);
 
   protected String pipeName;
+  protected long creationTime;
   protected String dataRegionId;
   protected PipeTaskMeta pipeTaskMeta;
 
@@ -167,7 +168,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     // indexed by the taskID of IoTDBDataRegionExtractor. To avoid 
PipeRealtimeDataRegionExtractor
     // holding a reference to IoTDBDataRegionExtractor, the taskID should be 
constructed to
     // match that of IoTDBDataRegionExtractor.
-    final long creationTime = environment.getCreationTime();
+    creationTime = environment.getCreationTime();
     taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
 
     pipePattern = PipePattern.parsePipePatternFromSourceParameters(parameters);
@@ -387,6 +388,10 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     return pipeName;
   }
 
+  public final long getCreationTime() {
+    return creationTime;
+  }
+
   public final PipeTaskMeta getPipeTaskMeta() {
     return pipeTaskMeta;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index e85fd5b0a1d..16f9ebf61d9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -76,6 +76,7 @@ public class PipeDataRegionAssigner implements Closeable {
               final PipeRealtimeEvent copiedEvent =
                   event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
                       extractor.getPipeName(),
+                      extractor.getCreationTime(),
                       extractor.getPipeTaskMeta(),
                       extractor.getPipePattern(),
                       extractor.getRealtimeDataExtractionStartTime(),
@@ -90,7 +91,6 @@ public class PipeDataRegionAssigner implements Closeable {
               extractor.extract(copiedEvent);
 
               if (innerEvent instanceof PipeHeartbeatEvent) {
-                ((PipeHeartbeatEvent) 
innerEvent).bindPipeName(extractor.getPipeName());
                 ((PipeHeartbeatEvent) innerEvent).onAssigned();
               }
             });
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index c1ecb8460b4..8f9008f5d9a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -19,17 +19,12 @@
 
 package org.apache.iotdb.db.pipe.metric;
 
-import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.consensus.SchemaRegionId;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
-import org.apache.iotdb.db.schemaengine.SchemaEngine;
-import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
 import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -151,6 +146,26 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
     }
   }
 
+  public void thawRate(final String pipeID) {
+    if (!remainingEventAndTimeOperatorMap.containsKey(pipeID)) {
+      // In dataNode, the "thawRate" may be called when there are no subtasks, 
and we call
+      // "startPipe".
+      // We thaw it later in "startPipeTask".
+      return;
+    }
+    remainingEventAndTimeOperatorMap.get(pipeID).thawRate(true);
+  }
+
+  public void freezeRate(final String pipeID) {
+    if (!remainingEventAndTimeOperatorMap.containsKey(pipeID)) {
+      // In dataNode, the "freezeRate" may be called when there are no 
subtasks, and we call
+      // "stopPipe" after calling "startPipe".
+      // We do nothing because in that case the rate is not thawed initially
+      return;
+    }
+    remainingEventAndTimeOperatorMap.get(pipeID).freezeRate(true);
+  }
+
   public void deregister(final String pipeID) {
     if (!remainingEventAndTimeOperatorMap.containsKey(pipeID)) {
       LOGGER.warn(
@@ -163,13 +178,7 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
     }
   }
 
-  public void markRegionCommit(final PipeTaskRuntimeEnvironment 
pipeTaskRuntimeEnvironment) {
-    // Filter commit attempt from assigner
-    final String pipeName = pipeTaskRuntimeEnvironment.getPipeName();
-    final int regionId = pipeTaskRuntimeEnvironment.getRegionId();
-    final long creationTime = pipeTaskRuntimeEnvironment.getCreationTime();
-    final String pipeID = pipeName + "_" + creationTime;
-
+  public void markRegionCommit(final String pipeID, final boolean 
isDataRegion) {
     if (Objects.isNull(metricService)) {
       return;
     }
@@ -181,19 +190,10 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
           pipeID);
       return;
     }
-    // Prevent not set pipeName / creation times & potential differences 
between pipeNames and
-    // creation times
-    if (!Objects.equals(pipeName, operator.getPipeName())
-        || !Objects.equals(creationTime, operator.getCreationTime())) {
-      return;
-    }
 
-    // Prevent empty region-ids
-    if (StorageEngine.getInstance().getAllDataRegionIds().contains(new 
DataRegionId(regionId))) {
+    if (isDataRegion) {
       operator.markDataRegionCommit();
-    }
-
-    if (SchemaEngine.getInstance().getAllSchemaRegionIds().contains(new 
SchemaRegionId(regionId))) {
+    } else {
       operator.markSchemaRegionCommit();
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index f7a08295f40..dcab93d535c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -19,61 +19,49 @@
 
 package org.apache.iotdb.db.pipe.metric;
 
+import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
+import org.apache.iotdb.pipe.api.event.Event;
 
 import com.codahale.metrics.Clock;
 import com.codahale.metrics.ExponentialMovingAverages;
 import com.codahale.metrics.Meter;
 
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
-class PipeDataNodeRemainingEventAndTimeOperator {
+class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
+  private final Set<IoTDBDataRegionExtractor> dataRegionExtractors =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
+  private final Set<PipeConnectorSubtask> dataRegionConnectors =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
+  private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
+  private final AtomicReference<Meter> dataRegionCommitMeter = new 
AtomicReference<>(null);
+  private final AtomicReference<Meter> schemaRegionCommitMeter = new 
AtomicReference<>(null);
 
-  private static final long DATA_NODE_REMAINING_MAX_SECONDS = 365 * 24 * 60 * 
60L; // 1 year
-
-  private String pipeName;
-  private long creationTime = 0;
-
-  private final ConcurrentMap<IoTDBDataRegionExtractor, 
IoTDBDataRegionExtractor>
-      dataRegionExtractors = new ConcurrentHashMap<>();
-  private final ConcurrentMap<PipeConnectorSubtask, PipeConnectorSubtask> 
dataRegionConnectors =
-      new ConcurrentHashMap<>();
-  private final ConcurrentMap<IoTDBSchemaRegionExtractor, 
IoTDBSchemaRegionExtractor>
-      schemaRegionExtractors = new ConcurrentHashMap<>();
-  private final Meter dataRegionCommitMeter =
-      new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
-  private final Meter schemaRegionCommitMeter =
-      new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
-
-  private double lastDataRegionCommitSmoothingValue = Long.MIN_VALUE;
-  private double lastSchemaRegionCommitSmoothingValue = Long.MIN_VALUE;
-
-  //////////////////////////// Tags ////////////////////////////
-
-  String getPipeName() {
-    return pipeName;
-  }
-
-  long getCreationTime() {
-    return creationTime;
-  }
+  private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
+  private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
 
   //////////////////////////// Remaining event & time calculation 
////////////////////////////
 
   long getRemainingEvents() {
-    return dataRegionExtractors.keySet().stream()
+    return dataRegionExtractors.stream()
             .map(IoTDBDataRegionExtractor::getEventCount)
             .reduce(Integer::sum)
             .orElse(0)
-        + dataRegionConnectors.keySet().stream()
+        + dataRegionConnectors.stream()
             .map(connectorSubtask -> connectorSubtask.getEventCount(pipeName))
             .reduce(Integer::sum)
             .orElse(0)
-        + schemaRegionExtractors.keySet().stream()
+        + schemaRegionExtractors.stream()
             .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
             .reduce(Long::sum)
             .orElse(0L);
@@ -82,39 +70,41 @@ class PipeDataNodeRemainingEventAndTimeOperator {
   /**
    * This will calculate the estimated remaining time of pipe.
    *
-   * <p>Note: The events in pipe assigner are omitted.
+   * <p>Note: The {@link Event}s in pipe assigner are omitted.
    *
    * @return The estimated remaining time
    */
   double getRemainingTime() {
-    final double pipeRemainingTimeCommitRateSmoothingFactor =
-        
PipeConfig.getInstance().getPipeRemainingTimeCommitRateSmoothingFactor();
+    final PipeRemainingTimeRateAverageTime 
pipeRemainingTimeCommitRateAverageTime =
+        PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
 
     // Do not take heartbeat event into account
     final int totalDataRegionWriteEventCount =
-        dataRegionExtractors.keySet().stream()
+        dataRegionExtractors.stream()
                 .map(IoTDBDataRegionExtractor::getEventCount)
                 .reduce(Integer::sum)
                 .orElse(0)
-            + dataRegionConnectors.keySet().stream()
+            + dataRegionConnectors.stream()
                 .map(connectorSubtask -> 
connectorSubtask.getEventCount(pipeName))
                 .reduce(Integer::sum)
                 .orElse(0)
-            - dataRegionExtractors.keySet().stream()
+            - dataRegionExtractors.stream()
                 .map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
                 .reduce(Integer::sum)
                 .orElse(0)
-            - dataRegionConnectors.keySet().stream()
+            - dataRegionConnectors.stream()
                 .map(PipeConnectorSubtask::getPipeHeartbeatEventCount)
                 .reduce(Integer::sum)
                 .orElse(0);
 
-    lastDataRegionCommitSmoothingValue =
-        lastDataRegionCommitSmoothingValue == Long.MIN_VALUE
-            ? dataRegionCommitMeter.getOneMinuteRate()
-            : pipeRemainingTimeCommitRateSmoothingFactor * 
dataRegionCommitMeter.getOneMinuteRate()
-                + (1 - pipeRemainingTimeCommitRateSmoothingFactor)
-                    * lastDataRegionCommitSmoothingValue;
+    dataRegionCommitMeter.updateAndGet(
+        meter -> {
+          if (Objects.nonNull(meter)) {
+            lastDataRegionCommitSmoothingValue =
+                pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
+          }
+          return meter;
+        });
     final double dataRegionRemainingTime;
     if (totalDataRegionWriteEventCount <= 0) {
       dataRegionRemainingTime = 0;
@@ -126,18 +116,19 @@ class PipeDataNodeRemainingEventAndTimeOperator {
     }
 
     final long totalSchemaRegionWriteEventCount =
-        schemaRegionExtractors.keySet().stream()
+        schemaRegionExtractors.stream()
             .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
             .reduce(Long::sum)
             .orElse(0L);
 
-    lastSchemaRegionCommitSmoothingValue =
-        lastSchemaRegionCommitSmoothingValue == Long.MIN_VALUE
-            ? schemaRegionCommitMeter.getOneMinuteRate()
-            : pipeRemainingTimeCommitRateSmoothingFactor
-                    * schemaRegionCommitMeter.getOneMinuteRate()
-                + (1 - pipeRemainingTimeCommitRateSmoothingFactor)
-                    * lastSchemaRegionCommitSmoothingValue;
+    schemaRegionCommitMeter.updateAndGet(
+        meter -> {
+          if (Objects.nonNull(meter)) {
+            lastSchemaRegionCommitSmoothingValue =
+                pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
+          }
+          return meter;
+        });
     final double schemaRegionRemainingTime;
     if (totalSchemaRegionWriteEventCount <= 0) {
       schemaRegionRemainingTime = 0;
@@ -148,40 +139,77 @@ class PipeDataNodeRemainingEventAndTimeOperator {
               : totalSchemaRegionWriteEventCount / 
lastSchemaRegionCommitSmoothingValue;
     }
 
+    if (totalDataRegionWriteEventCount + totalSchemaRegionWriteEventCount == 
0) {
+      notifyEmpty();
+    } else {
+      notifyNonEmpty();
+    }
+
     final double result = Math.max(dataRegionRemainingTime, 
schemaRegionRemainingTime);
-    return result >= DATA_NODE_REMAINING_MAX_SECONDS ? 
DATA_NODE_REMAINING_MAX_SECONDS : result;
+    return result >= REMAINING_MAX_SECONDS ? REMAINING_MAX_SECONDS : result;
   }
 
   //////////////////////////// Register & deregister (pipe integration) 
////////////////////////////
 
   void register(final IoTDBDataRegionExtractor extractor) {
     setNameAndCreationTime(extractor.getPipeName(), 
extractor.getCreationTime());
-    dataRegionExtractors.put(extractor, extractor);
+    dataRegionExtractors.add(extractor);
   }
 
   void register(
       final PipeConnectorSubtask connectorSubtask, final String pipeName, 
final long creationTime) {
     setNameAndCreationTime(pipeName, creationTime);
-    dataRegionConnectors.put(connectorSubtask, connectorSubtask);
+    dataRegionConnectors.add(connectorSubtask);
   }
 
   void register(final IoTDBSchemaRegionExtractor extractor) {
     setNameAndCreationTime(extractor.getPipeName(), 
extractor.getCreationTime());
-    schemaRegionExtractors.put(extractor, extractor);
-  }
-
-  private void setNameAndCreationTime(final String pipeName, final long 
creationTime) {
-    this.pipeName = pipeName;
-    this.creationTime = creationTime;
+    schemaRegionExtractors.add(extractor);
   }
 
   //////////////////////////// Rate ////////////////////////////
 
   void markDataRegionCommit() {
-    dataRegionCommitMeter.mark();
+    dataRegionCommitMeter.updateAndGet(
+        meter -> {
+          if (Objects.nonNull(meter)) {
+            meter.mark();
+          }
+          return meter;
+        });
   }
 
   void markSchemaRegionCommit() {
-    schemaRegionCommitMeter.mark();
+    schemaRegionCommitMeter.updateAndGet(
+        meter -> {
+          if (Objects.nonNull(meter)) {
+            meter.mark();
+          }
+          return meter;
+        });
+  }
+
+  //////////////////////////// Switch ////////////////////////////
+
+  // Thread-safe & Idempotent
+  @Override
+  public synchronized void thawRate(final boolean isStartPipe) {
+    super.thawRate(isStartPipe);
+    // The stopped pipe's rate should only be thawed by "startPipe" command
+    if (isStopped) {
+      return;
+    }
+    dataRegionCommitMeter.compareAndSet(
+        null, new Meter(new ExponentialMovingAverages(), 
Clock.defaultClock()));
+    schemaRegionCommitMeter.compareAndSet(
+        null, new Meter(new ExponentialMovingAverages(), 
Clock.defaultClock()));
+  }
+
+  // Thread-safe & Idempotent
+  @Override
+  public synchronized void freezeRate(final boolean isStopPipe) {
+    super.freezeRate(isStopPipe);
+    dataRegionCommitMeter.set(null);
+    schemaRegionCommitMeter.set(null);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
index 2017051866f..07f952277aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
@@ -253,7 +253,7 @@ public class TwoStageCountProcessor implements 
PipeProcessor {
       tablet.addValue(outputSeries.getMeasurement(), 0, 
timestampCountPair.right);
 
       eventCollector.collect(
-          new PipeRawTabletInsertionEvent(tablet, false, null, null, null, 
false));
+          new PipeRawTabletInsertionEvent(tablet, false, null, 0, null, null, 
false));
 
       PipeCombineHandlerManager.getInstance()
           .updateLastCombinedValue(pipeName, creationTime, timestampCountPair);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 3055568aadf..fd673fe9b95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -142,6 +142,7 @@ public class PipeEventCollector implements EventCollector {
                 new PipeSchemaRegionWritePlanEvent(
                     planNode,
                     deleteDataEvent.getPipeName(),
+                    deleteDataEvent.getCreationTime(),
                     deleteDataEvent.getPipeTaskMeta(),
                     deleteDataEvent.getPipePattern(),
                     deleteDataEvent.isGeneratedByPipe()))
diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml
index 5e5a1d251dc..e75e207d030 100644
--- a/iotdb-core/node-commons/pom.xml
+++ b/iotdb-core/node-commons/pom.xml
@@ -95,6 +95,10 @@
             <artifactId>metrics-core</artifactId>
             <version>1.3.3-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 1de7f33df3a..c21f853b8a7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.conf;
 import 
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
+import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
 
@@ -235,7 +236,9 @@ public class CommonConfig {
   private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
   private long pipeListeningQueueTransferSnapshotThreshold = 1000;
   private int pipeSnapshotExecutionMaxBatchSize = 1000;
-  private double pipeRemainingTimeCommitRateSmoothingFactor = 0.5;
+  private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
+  private PipeRemainingTimeRateAverageTime 
pipeRemainingTimeCommitRateAverageTime =
+      PipeRemainingTimeRateAverageTime.MEAN;
 
   private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 
minutes
   private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; 
// 3 minutes
@@ -1020,13 +1023,23 @@ public class CommonConfig {
     this.pipeSnapshotExecutionMaxBatchSize = pipeSnapshotExecutionMaxBatchSize;
   }
 
-  public double getPipeRemainingTimeCommitRateSmoothingFactor() {
-    return pipeRemainingTimeCommitRateSmoothingFactor;
+  public long getPipeRemainingTimeCommitRateAutoSwitchSeconds() {
+    return pipeRemainingTimeCommitRateAutoSwitchSeconds;
   }
 
-  public void setPipeRemainingTimeCommitRateSmoothingFactor(
-      double pipeRemainingTimeCommitRateSmoothingFactor) {
-    this.pipeRemainingTimeCommitRateSmoothingFactor = 
pipeRemainingTimeCommitRateSmoothingFactor;
+  public void setPipeRemainingTimeCommitRateAutoSwitchSeconds(
+      long pipeRemainingTimeCommitRateAutoSwitchSeconds) {
+    this.pipeRemainingTimeCommitRateAutoSwitchSeconds =
+        pipeRemainingTimeCommitRateAutoSwitchSeconds;
+  }
+
+  public PipeRemainingTimeRateAverageTime 
getPipeRemainingTimeCommitRateAverageTime() {
+    return pipeRemainingTimeCommitRateAverageTime;
+  }
+
+  public void setPipeRemainingTimeCommitRateAverageTime(
+      PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime) 
{
+    this.pipeRemainingTimeCommitRateAverageTime = 
pipeRemainingTimeCommitRateAverageTime;
   }
 
   public double getPipeAllSinksRateLimitBytesPerSecond() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index f72ea5e528d..67718e1224c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.conf;
 
 import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
+import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
 
@@ -541,11 +542,18 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_snapshot_execution_max_batch_size",
                 
String.valueOf(config.getPipeSnapshotExecutionMaxBatchSize()))));
-    config.setPipeRemainingTimeCommitRateSmoothingFactor(
-        Double.parseDouble(
+    config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
+        Long.parseLong(
             properties.getProperty(
-                "pipe_remaining_time_commit_rate_smoothing_factor",
-                
String.valueOf(config.getPipeRemainingTimeCommitRateSmoothingFactor()))));
+                "pipe_remaining_time_commit_rate_auto_switch_seconds",
+                
String.valueOf(config.getPipeRemainingTimeCommitRateAutoSwitchSeconds()))));
+    config.setPipeRemainingTimeCommitRateAverageTime(
+        PipeRemainingTimeRateAverageTime.valueOf(
+            properties
+                .getProperty(
+                    "pipe_remaining_time_commit_rate_average_time",
+                    
String.valueOf(config.getPipeRemainingTimeCommitRateAverageTime()))
+                .trim()));
 
     config.setTwoStageAggregateMaxCombinerLiveTimeInMs(
         Long.parseLong(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java
new file mode 100644
index 00000000000..25f799d09f6
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.enums;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
+import com.codahale.metrics.Meter;
+
+public enum PipeRemainingTimeRateAverageTime {
+  ONE_MINUTE,
+  FIVE_MINUTES,
+  FIFTEEN_MINUTES,
+  MEAN;
+
+  public double getMeterRate(final Meter meter) {
+    switch (this) {
+      case ONE_MINUTE:
+        return meter.getOneMinuteRate();
+      case FIVE_MINUTES:
+        return meter.getFiveMinuteRate();
+      case FIFTEEN_MINUTES:
+        return meter.getFifteenMinuteRate();
+      case MEAN:
+        return meter.getMeanRate();
+      default:
+        throw new UnsupportedOperationException(
+            String.format(
+                "The type %s is not supported in average time of pipe 
remaining time.",
+                
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime()));
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 65c1a95aaa3..63dd14c46c9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -272,6 +272,9 @@ public abstract class PipeTaskAgent {
         break;
       case STOPPED:
         if (Objects.requireNonNull(statusInAgent) == PipeStatus.RUNNING) {
+          // Only freeze rate for user stopped pipes
+          // Freeze first to get better results in calculation
+          freezeRate(pipeStaticMeta.getPipeName(), 
pipeStaticMeta.getCreationTime());
           stopPipe(pipeStaticMeta.getPipeName(), 
pipeStaticMeta.getCreationTime());
         } else {
           throw new IllegalStateException(
@@ -290,6 +293,10 @@ public abstract class PipeTaskAgent {
     }
   }
 
+  protected abstract void thawRate(final String pipeName, final long 
creationTime);
+
+  protected abstract void freezeRate(final String pipeName, final long 
creationTime);
+
   public TPushPipeMetaRespExceptionMessage handleDropPipe(final String 
pipeName) {
     acquireWriteLock();
     try {
@@ -576,9 +583,11 @@ public abstract class PipeTaskAgent {
         .getConsensusGroupId2TaskMetaMap()
         .values()
         .forEach(PipeTaskMeta::clearExceptionMessages);
+
+    thawRate(pipeName, creationTime);
   }
 
-  protected void stopPipe(final String pipeName, final long creationTime) {
+  private void stopPipe(final String pipeName, final long creationTime) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (!checkBeforeStopPipe(existedPipeMeta, pipeName, creationTime)) {
@@ -856,6 +865,7 @@ public abstract class PipeTaskAgent {
     final PipeTask pipeTask = pipeTaskManager.getPipeTask(pipeStaticMeta, 
consensusGroupId);
     if (pipeTask != null) {
       pipeTask.start();
+      thawRate(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
     }
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 48debcce408..5fa4aa8a812 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.config;
 
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,8 +140,12 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
   }
 
-  public double getPipeRemainingTimeCommitRateSmoothingFactor() {
-    return COMMON_CONFIG.getPipeRemainingTimeCommitRateSmoothingFactor();
+  public long getPipeRemainingTimeCommitAutoSwitchSeconds() {
+    return COMMON_CONFIG.getPipeRemainingTimeCommitRateAutoSwitchSeconds();
+  }
+
+  public PipeRemainingTimeRateAverageTime 
getPipeRemainingTimeCommitRateAverageTime() {
+    return COMMON_CONFIG.getPipeRemainingTimeCommitRateAverageTime();
   }
 
   /////////////////////////////// Meta Consistency 
///////////////////////////////
@@ -328,8 +333,10 @@ public class PipeConfig {
         getPipeListeningQueueTransferSnapshotThreshold());
     LOGGER.info("PipeSnapshotExecutionMaxBatchSize: {}", 
getPipeSnapshotExecutionMaxBatchSize());
     LOGGER.info(
-        "PipeRemainingTimeCommitRateSmoothingFactor: {}",
-        getPipeRemainingTimeCommitRateSmoothingFactor());
+        "PipeRemainingTimeCommitAutoSwitchSeconds: {}",
+        getPipeRemainingTimeCommitAutoSwitchSeconds());
+    LOGGER.info(
+        "PipeRemainingTimeCommitRateAverageTime: {}", 
getPipeRemainingTimeCommitRateAverageTime());
 
     LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
     LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index b43e37788cd..8de596a5447 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -48,6 +48,7 @@ public abstract class EnrichedEvent implements Event {
   protected final AtomicBoolean isReleased;
 
   protected final String pipeName;
+  protected final long creationTime;
   protected final PipeTaskMeta pipeTaskMeta;
 
   protected String committerKey;
@@ -67,6 +68,7 @@ public abstract class EnrichedEvent implements Event {
 
   protected EnrichedEvent(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pipePattern,
       final long startTime,
@@ -74,6 +76,7 @@ public abstract class EnrichedEvent implements Event {
     referenceCount = new AtomicInteger(0);
     isReleased = new AtomicBoolean(false);
     this.pipeName = pipeName;
+    this.creationTime = creationTime;
     this.pipeTaskMeta = pipeTaskMeta;
     this.pipePattern = pipePattern;
     this.startTime = startTime;
@@ -129,7 +132,7 @@ public abstract class EnrichedEvent implements Event {
    *     {@code false} if the {@link EnrichedEvent} is not controlled by the 
invoker, which means
    *     the data stored in the event is not safe to use
    */
-  public abstract boolean internallyIncreaseResourceReferenceCount(String 
holderMessage);
+  public abstract boolean internallyIncreaseResourceReferenceCount(final 
String holderMessage);
 
   /**
    * Decrease the {@link EnrichedEvent#referenceCount} of this {@link 
EnrichedEvent} by 1. If the
@@ -208,7 +211,7 @@ public abstract class EnrichedEvent implements Event {
    * @return {@code true} if the {@link EnrichedEvent#referenceCount} is 
decreased successfully,
    *     {@code true} otherwise
    */
-  public abstract boolean internallyDecreaseResourceReferenceCount(String 
holderMessage);
+  public abstract boolean internallyDecreaseResourceReferenceCount(final 
String holderMessage);
 
   protected void reportProgress() {
     if (pipeTaskMeta != null) {
@@ -245,6 +248,14 @@ public abstract class EnrichedEvent implements Event {
     return pipeName;
   }
 
+  public final long getCreationTime() {
+    return creationTime;
+  }
+
+  public final boolean isDataRegionEvent() {
+    return !(this instanceof PipeWritePlanEvent) && !(this instanceof 
PipeSnapshotEvent);
+  }
+
   /**
    * Get the pattern string of this {@link EnrichedEvent}.
    *
@@ -291,11 +302,12 @@ public abstract class EnrichedEvent implements Event {
   }
 
   public abstract EnrichedEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime);
+      final String pipeName,
+      final long creationTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime);
 
   public PipeTaskMeta getPipeTaskMeta() {
     return pipeTaskMeta;
@@ -315,7 +327,7 @@ public abstract class EnrichedEvent implements Event {
     this.commitId = commitId;
   }
 
-  public void setRebootTimes(int rebootTimes) {
+  public void setRebootTimes(final int rebootTimes) {
     this.rebootTimes = rebootTimes;
   }
 
@@ -345,14 +357,14 @@ public abstract class EnrichedEvent implements Event {
    * Used for pipeConsensus. In PipeConsensus, we only need committerKey, 
commitId and rebootTimes
    * to uniquely identify an event
    */
-  public boolean equalsInPipeConsensus(Object o) {
+  public boolean equalsInPipeConsensus(final Object o) {
     if (this == o) {
       return true;
     }
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    EnrichedEvent otherEvent = (EnrichedEvent) o;
+    final EnrichedEvent otherEvent = (EnrichedEvent) o;
     return Objects.equals(committerKey, otherEvent.committerKey)
         && commitId == otherEvent.commitId
         && rebootTimes == otherEvent.rebootTimes;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
index fa26319e6dc..f130a83c0ee 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
@@ -35,10 +35,11 @@ public abstract class PipeSnapshotEvent extends 
EnrichedEvent implements Seriali
 
   protected PipeSnapshotEvent(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final PipeSnapshotResourceManager resourceManager) {
-    super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
+    super(pipeName, creationTime, pipeTaskMeta, pattern, Long.MIN_VALUE, 
Long.MAX_VALUE);
     this.resourceManager = resourceManager;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index d37aee82aaa..c553e6ce996 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -31,10 +31,11 @@ public abstract class PipeWritePlanEvent extends 
EnrichedEvent implements Serial
 
   protected PipeWritePlanEvent(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final boolean isGeneratedByPipe) {
-    super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
+    super(pipeName, creationTime, pipeTaskMeta, pattern, Long.MIN_VALUE, 
Long.MAX_VALUE);
     this.isGeneratedByPipe = isGeneratedByPipe;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
index 2851b03593c..4f00c89c5e6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
@@ -34,11 +34,12 @@ public class ProgressReportEvent extends EnrichedEvent {
 
   public ProgressReportEvent(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pipePattern,
       final long startTime,
       final long endTime) {
-    super(pipeName, pipeTaskMeta, pipePattern, startTime, endTime);
+    super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, 
endTime);
   }
 
   @Override
@@ -64,11 +65,13 @@ public class ProgressReportEvent extends EnrichedEvent {
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
+      final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final long startTime,
       final long endTime) {
-    return new ProgressReportEvent(pipeName, pipeTaskMeta, pattern, startTime, 
endTime);
+    return new ProgressReportEvent(
+        pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
index 9d6ce0ffde8..a67e818be98 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
@@ -154,7 +154,12 @@ public abstract class IoTDBNonDataRegionExtractor extends 
IoTDBExtractor {
               historicalEvents
                   .remove(0)
                   .shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-                      pipeName, pipeTaskMeta, pipePattern, Long.MIN_VALUE, 
Long.MAX_VALUE);
+                      pipeName,
+                      creationTime,
+                      pipeTaskMeta,
+                      pipePattern,
+                      Long.MIN_VALUE,
+                      Long.MAX_VALUE);
 
       if (historicalEvents.isEmpty()) {
         // We only report progress for the last snapshot event.
@@ -180,7 +185,7 @@ public abstract class IoTDBNonDataRegionExtractor extends 
IoTDBExtractor {
         || (!isForwardingPipeRequests && realtimeEvent.isGeneratedByPipe())) {
       final ProgressReportEvent event =
           new ProgressReportEvent(
-              pipeName, pipeTaskMeta, pipePattern, Long.MIN_VALUE, 
Long.MAX_VALUE);
+              pipeName, creationTime, pipeTaskMeta, pipePattern, 
Long.MIN_VALUE, Long.MAX_VALUE);
       event.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex() - 
1));
       
event.increaseReferenceCount(IoTDBNonDataRegionExtractor.class.getName());
       return event;
@@ -189,7 +194,7 @@ public abstract class IoTDBNonDataRegionExtractor extends 
IoTDBExtractor {
     realtimeEvent =
         (PipeWritePlanEvent)
             realtimeEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-                pipeName, pipeTaskMeta, pipePattern, Long.MIN_VALUE, 
Long.MAX_VALUE);
+                pipeName, creationTime, pipeTaskMeta, pipePattern, 
Long.MIN_VALUE, Long.MAX_VALUE);
     realtimeEvent.bindProgressIndex(new 
MetaProgressIndex(iterator.getNextIndex() - 1));
     
realtimeEvent.increaseReferenceCount(IoTDBNonDataRegionExtractor.class.getName());
     return realtimeEvent;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
new file mode 100644
index 00000000000..97f80e5d985
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.metric;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
+public abstract class PipeRemainingOperator {
+
+  protected static final long REMAINING_MAX_SECONDS = 365 * 24 * 60 * 60L; // 
1 year
+
+  protected String pipeName;
+  protected long creationTime = 0;
+
+  private long lastEmptyTimeStamp = System.currentTimeMillis();
+  private long lastNonEmptyTimeStamp = System.currentTimeMillis();
+  protected boolean isStopped = true;
+
+  //////////////////////////// Tags ////////////////////////////
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public long getCreationTime() {
+    return creationTime;
+  }
+
+  //////////////////////////// Register & deregister (pipe integration) 
////////////////////////////
+
+  protected void setNameAndCreationTime(final String pipeName, final long 
creationTime) {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+  }
+
+  //////////////////////////// Switch ////////////////////////////
+
+  protected void notifyNonEmpty() {
+    final long pipeRemainingTimeCommitAutoSwitchSeconds =
+        PipeConfig.getInstance().getPipeRemainingTimeCommitAutoSwitchSeconds();
+
+    lastNonEmptyTimeStamp = System.currentTimeMillis();
+    if (lastNonEmptyTimeStamp - lastEmptyTimeStamp
+        >= pipeRemainingTimeCommitAutoSwitchSeconds * 1000) {
+      thawRate(false);
+    }
+  }
+
+  protected void notifyEmpty() {
+    final long pipeRemainingTimeCommitAutoSwitchSeconds =
+        PipeConfig.getInstance().getPipeRemainingTimeCommitAutoSwitchSeconds();
+
+    lastEmptyTimeStamp = System.currentTimeMillis();
+    if (lastEmptyTimeStamp - lastNonEmptyTimeStamp
+        >= pipeRemainingTimeCommitAutoSwitchSeconds * 1000) {
+      freezeRate(false);
+    }
+  }
+
+  public synchronized void thawRate(final boolean isStartPipe) {
+    if (isStartPipe) {
+      isStopped = false;
+    }
+  }
+
+  public synchronized void freezeRate(final boolean isStopPipe) {
+    if (isStopPipe) {
+      isStopped = true;
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
index 0b740f7e12a..d51f0126612 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.commons.pipe.progress;
 
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics;
 
@@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
 
 public class PipeEventCommitManager {
 
@@ -38,7 +37,7 @@ public class PipeEventCommitManager {
   // key: pipeName_regionId
   private final Map<String, PipeEventCommitter> eventCommitterMap = new 
ConcurrentHashMap<>();
 
-  private Consumer<PipeTaskRuntimeEnvironment> commitRateMarker;
+  private BiConsumer<String, Boolean> commitRateMarker;
 
   public void register(
       final String pipeName,
@@ -88,10 +87,26 @@ public class PipeEventCommitManager {
   }
 
   public void commit(final EnrichedEvent event, final String committerKey) {
-    if (committerKey == null
-        || event == null
+    if (event == null
         || !event.needToCommit()
-        || event.getCommitId() <= EnrichedEvent.NO_COMMIT_ID) {
+        || Objects.isNull(event.getPipeName())
+        || event.getCreationTime() == 0) {
+      return;
+    }
+    if (Objects.nonNull(commitRateMarker)) {
+      try {
+        commitRateMarker.accept(
+            event.getPipeName() + '_' + event.getCreationTime(), 
event.isDataRegionEvent());
+      } catch (final Exception e) {
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug(
+              "Failed to mark commit rate for pipe task: {}, stack trace: {}",
+              committerKey,
+              Thread.currentThread().getStackTrace());
+        }
+      }
+    }
+    if (committerKey == null || event.getCommitId() <= 
EnrichedEvent.NO_COMMIT_ID) {
       return;
     }
     final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
@@ -108,20 +123,6 @@ public class PipeEventCommitManager {
     }
 
     committer.commit(event);
-    if (Objects.nonNull(commitRateMarker)) {
-      try {
-        commitRateMarker.accept(
-            new PipeTaskRuntimeEnvironment(
-                committer.getPipeName(), committer.getCreationTime(), 
committer.getRegionId()));
-      } catch (Exception e) {
-        if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug(
-              "Failed to mark commit rate for pipe: {}, stack trace: {}",
-              committerKey,
-              Thread.currentThread().getStackTrace());
-        }
-      }
-    }
   }
 
   private static String generateCommitterKey(
@@ -129,7 +130,7 @@ public class PipeEventCommitManager {
     return String.format("%s_%s_%s", pipeName, regionId, creationTime);
   }
 
-  public void setCommitRateMarker(final Consumer<PipeTaskRuntimeEnvironment> 
commitRateMarker) {
+  public void setCommitRateMarker(final BiConsumer<String, Boolean> 
commitRateMarker) {
     this.commitRateMarker = commitRateMarker;
   }
 

Reply via email to