This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ffe3400af9 Pipe: Optimized the hybrid switching algorithm (#15528)
2ffe3400af9 is described below

commit 2ffe3400af95d0d413168f1914a83603fe2390c3
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 21 15:33:56 2025 +0800

    Pipe: Optimized the hybrid switching algorithm (#15528)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  11 +-
 .../agent/task/connection/PipeEventCollector.java  |   9 +-
 .../statement/PipeStatementInsertionEvent.java     |   4 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |   4 +-
 .../common/tablet/PipeRawTabletInsertionEvent.java |   4 +-
 .../realtime/PipeRealtimeDataRegionExtractor.java  |   2 +
 .../PipeRealtimeDataRegionHybridExtractor.java     | 201 ++++++++++++++-------
 .../PipeDataNodeRemainingEventAndTimeMetrics.java  |  50 ++++-
 .../PipeDataNodeRemainingEventAndTimeOperator.java |  41 ++++-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  62 ++++++-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  21 +++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  16 ++
 .../commons/pipe/resource/log/PipeLogManager.java  |  14 ++
 13 files changed, 344 insertions(+), 95 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index f09422fa663..8fd099baa06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -699,7 +699,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 || mayWalSizeReachThrottleThreshold())) {
           // Extractors of this pipe may be stuck and is pinning too many 
MemTables.
           LOGGER.warn(
-              "Pipe {} needs to restart because too many memtables are pinned. 
mayMemTablePinnedCountReachDangerousThreshold: {}, 
mayWalSizeReachThrottleThreshold: {}",
+              "Pipe {} needs to restart because too many memTables are pinned 
or the WAL size is too large. mayMemTablePinnedCountReachDangerousThreshold: 
{}, mayWalSizeReachThrottleThreshold: {}",
               pipeMeta.getStaticMeta(),
               mayMemTablePinnedCountReachDangerousThreshold(),
               mayWalSizeReachThrottleThreshold());
@@ -737,10 +737,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   }
 
   private boolean mayMemTablePinnedCountReachDangerousThreshold() {
-    return PipeDataNodeResourceManager.wal().getPinnedWalCount()
-        >= 5
-            * PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount()
-            * StorageEngine.getInstance().getDataRegionNumber();
+    return PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount() != 
Integer.MAX_VALUE
+        && PipeDataNodeResourceManager.wal().getPinnedWalCount()
+            >= 5
+                * 
PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount()
+                * StorageEngine.getInstance().getDataRegionNumber();
   }
 
   private boolean mayWalSizeReachThrottleThreshold() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index e385a8d1037..4e75fc9cbf2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -116,10 +116,11 @@ public class PipeEventCollector implements EventCollector 
{
   }
 
   private void parseAndCollectEvent(final PipeRawTabletInsertionEvent 
sourceEvent) {
-    collectParsedRawTableEvent(
-        sourceEvent.shouldParseTimeOrPattern()
-            ? sourceEvent.parseEventWithPatternOrTime()
-            : sourceEvent);
+    if (sourceEvent.shouldParseTimeOrPattern()) {
+      collectParsedRawTableEvent(sourceEvent.parseEventWithPatternOrTime());
+    } else {
+      collectEvent(sourceEvent);
+    }
   }
 
   private void parseAndCollectEvent(final PipeTsFileInsertionEvent 
sourceEvent) throws Exception {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
index 1efe3aa2ac7..990d25eebd1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
@@ -90,7 +90,7 @@ public class PipeStatementInsertionEvent extends 
PipeInsertionEvent
         .forceResize(allocatedMemoryBlock, statement.ramBytesUsed() + 
INSTANCE_SIZE);
     if (Objects.nonNull(pipeName)) {
       PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-          .increaseTabletEventCount(pipeName, creationTime);
+          .increaseRawTabletEventCount(pipeName, creationTime);
     }
     return true;
   }
@@ -99,7 +99,7 @@ public class PipeStatementInsertionEvent extends 
PipeInsertionEvent
   public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
     if (Objects.nonNull(pipeName)) {
       PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-          .decreaseTabletEventCount(pipeName, creationTime);
+          .decreaseRawTabletEventCount(pipeName, creationTime);
     }
     allocatedMemoryBlock.close();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 598eeaed0da..0b8dc056fae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -202,7 +202,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
       PipeDataNodeResourceManager.wal().pin(walEntryHandler);
       if (Objects.nonNull(pipeName)) {
         PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-            .increaseTabletEventCount(pipeName, creationTime);
+            .increaseInsertNodeEventCount(pipeName, creationTime);
         PipeDataNodeAgent.task().addFloatingMemoryUsageInByte(pipeName, 
ramBytesUsed());
       }
       return true;
@@ -238,7 +238,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
       if (Objects.nonNull(pipeName)) {
         PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName, 
ramBytesUsed());
         PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-            .decreaseTabletEventCount(pipeName, creationTime);
+            .decreaseInsertNodeEventCount(pipeName, creationTime);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index b2a6fc74922..5f29402bd36 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -236,7 +236,7 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
             PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 
INSTANCE_SIZE);
     if (Objects.nonNull(pipeName)) {
       PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-          .increaseTabletEventCount(pipeName, creationTime);
+          .increaseRawTabletEventCount(pipeName, creationTime);
     }
     return true;
   }
@@ -245,7 +245,7 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
   public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     if (Objects.nonNull(pipeName)) {
       PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-          .decreaseTabletEventCount(pipeName, creationTime);
+          .decreaseRawTabletEventCount(pipeName, creationTime);
     }
     allocatedMemoryBlock.close();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index e07385fd813..1b5f05fc8fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -125,6 +125,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
 
   protected final AtomicBoolean isClosed = new AtomicBoolean(false);
 
+  protected String pipeID;
   private String taskID;
   protected String userName;
   protected boolean skipIfNoPrivileges = true;
@@ -214,6 +215,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     // holding a reference to IoTDBDataRegionExtractor, the taskID should be 
constructed to
     // match that of IoTDBDataRegionExtractor.
     creationTime = environment.getCreationTime();
+    pipeID = pipeName + "_" + creationTime;
     taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
 
     treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index ebcd71626ab..52de019f741 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
+import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -43,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
+import java.util.function.Consumer;
 
 public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegionExtractor {
 
@@ -80,24 +82,24 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   private void extractTabletInsertion(final PipeRealtimeEvent event) {
-    TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+    TsFileEpoch.State state;
 
-    if (state != TsFileEpoch.State.USING_TSFILE
-        && state != TsFileEpoch.State.USING_BOTH
-        && canNotUseTabletAnyMore(event)) {
+    if (canNotUseTabletAnyMore(event)) {
+      event.getTsFileEpoch().migrateState(this, curState -> 
TsFileEpoch.State.USING_TSFILE);
+    } else {
       event
           .getTsFileEpoch()
           .migrateState(
               this,
               curState -> {
                 switch (curState) {
-                  case EMPTY:
+                  case USING_BOTH:
                   case USING_TSFILE:
-                    return TsFileEpoch.State.USING_TSFILE;
+                    return TsFileEpoch.State.USING_BOTH;
+                  case EMPTY:
                   case USING_TABLET:
-                  case USING_BOTH:
                   default:
-                    return TsFileEpoch.State.USING_BOTH;
+                    return TsFileEpoch.State.USING_TABLET;
                 }
               });
     }
@@ -111,6 +113,11 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
       case EMPTY:
       case USING_TABLET:
       case USING_BOTH:
+        // USING_BOTH indicates that there are discarded events previously.
+        // In this case, we need to delay the progress report to tsFile event, 
to avoid losing data.
+        if (state == TsFileEpoch.State.USING_BOTH) {
+          event.skipReportOnCommit();
+        }
         if (!pendingQueue.waitedOffer(event)) {
           // This would not happen, but just in case.
           // pendingQueue is unbounded, so it should never reach capacity.
@@ -169,7 +176,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
                   }
                 case USING_BOTH:
                 default:
-                  return TsFileEpoch.State.USING_BOTH;
+                  return canNotUseTabletAnyMore(event)
+                      ? TsFileEpoch.State.USING_TSFILE
+                      : TsFileEpoch.State.USING_BOTH;
               }
             });
 
@@ -208,23 +217,98 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
   }
 
   private boolean canNotUseTabletAnyMore(final PipeRealtimeEvent event) {
-    // In the following 7 cases, we should not extract any more tablet events. 
all the data
-    // represented by the tablet events should be carried by the following 
tsfile event:
-    //  0. If the pipe task is currently restarted.
+    // In the following 4 cases, we should not extract this tablet event. all 
the data
+    // represented by the tablet event should be carried by the following 
tsfile event:
+    //  0. If the remaining insert event count is too large, we need to reduce 
the accumulated
+    // tablets.
     //  1. If Wal size > maximum size of wal buffer,
     //  the write operation will be throttled, so we should not extract any 
more tablet events.
-    //  2. The number of pinned memtables has reached the dangerous threshold.
-    //  3. The number of historical tsFile events to transfer has exceeded the 
limit.
-    //  4. The number of realtime tsfile events to transfer has exceeded the 
limit.
-    //  5. The number of linked tsfiles has reached the dangerous threshold.
-    //  6. The shallow memory usage of the insert node has reached the 
dangerous threshold.
-    return isPipeTaskCurrentlyRestarted(event)
+    //  2. The shallow memory usage of the insert node has reached the 
dangerous threshold.
+    //  3. Deprecated logics (unused by default)
+    return mayRemainingInsertNodeEventExceedLimit(event)
         || mayWalSizeReachThrottleThreshold(event)
+        || mayInsertNodeMemoryReachDangerousThreshold(event)
+        || canNotUseTabletAnymoreDeprecated(event);
+  }
+
+  private boolean mayRemainingInsertNodeEventExceedLimit(final 
PipeRealtimeEvent event) {
+    final boolean mayRemainingInsertEventExceedLimit =
+        PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+            .mayRemainingInsertEventExceedLimit(pipeID);
+    if (mayRemainingInsertEventExceedLimit && 
event.mayExtractorUseTablets(this)) {
+      logByLogManager(
+          l ->
+              l.info(
+                  "Pipe task {}@{} canNotUseTabletAnyMore(0): remaining insert 
event has reached max allowed insert event count {}",
+                  pipeName,
+                  dataRegionId,
+                  
PipeConfig.getInstance().getPipeMaxAllowedRemainingInsertEventCountPerPipe()));
+    }
+    return mayRemainingInsertEventExceedLimit;
+  }
+
+  private boolean mayWalSizeReachThrottleThreshold(final PipeRealtimeEvent 
event) {
+    final boolean mayWalSizeReachThrottleThreshold =
+        3 * WALManager.getInstance().getTotalDiskUsage()
+            > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
+    if (mayWalSizeReachThrottleThreshold && 
event.mayExtractorUseTablets(this)) {
+      logByLogManager(
+          l ->
+              l.info(
+                  "Pipe task {}@{} canNotUseTabletAnyMore(1): Wal size {} has 
reached throttle threshold {}",
+                  pipeName,
+                  dataRegionId,
+                  WALManager.getInstance().getTotalDiskUsage(),
+                  
IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold() / 3.0d));
+    }
+    return mayWalSizeReachThrottleThreshold;
+  }
+
+  private boolean mayInsertNodeMemoryReachDangerousThreshold(final 
PipeRealtimeEvent event) {
+    final long floatingMemoryUsageInByte =
+        PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
+    final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
+    final long totalFloatingMemorySizeInBytes =
+        
PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes();
+    final boolean mayInsertNodeMemoryReachDangerousThreshold =
+        3 * floatingMemoryUsageInByte * pipeCount >= 2 * 
totalFloatingMemorySizeInBytes;
+    if (mayInsertNodeMemoryReachDangerousThreshold && 
event.mayExtractorUseTablets(this)) {
+      logByLogManager(
+          l ->
+              l.info(
+                  "Pipe task {}@{} canNotUseTabletAnyMore(2): The shallow 
memory usage of the insert node {} has reached the dangerous threshold {}",
+                  pipeName,
+                  dataRegionId,
+                  floatingMemoryUsageInByte * pipeCount,
+                  2 * totalFloatingMemorySizeInBytes / 3.0d));
+    }
+    return mayInsertNodeMemoryReachDangerousThreshold;
+  }
+
+  private void logByLogManager(final Consumer<Logger> infoFunction) {
+    PipeDataNodeResourceManager.log()
+        .schedule(
+            PipeRealtimeDataRegionHybridExtractor.class, getTaskID(), 
Integer.MAX_VALUE, 100, 1)
+        .ifPresent(infoFunction);
+  }
+
+  /**
+   * These judgements are deprecated, and are only reserved for manual 
operation and compatibility.
+   */
+  @Deprecated
+  private boolean canNotUseTabletAnymoreDeprecated(final PipeRealtimeEvent 
event) {
+    // In the following 5 cases, we should not extract any more tablet events. 
all the data
+    // represented by the tablet events should be carried by the following 
tsfile event:
+    //  0. If the pipe task is currently restarted.
+    //  1. The number of pinned memTables has reached the dangerous threshold.
+    //  2. The number of historical tsFile events to transfer has exceeded the 
limit.
+    //  3. The number of realtime tsfile events to transfer has exceeded the 
limit.
+    //  4. The number of linked tsFiles has reached the dangerous threshold.
+    return isPipeTaskCurrentlyRestarted(event)
         || mayMemTablePinnedCountReachDangerousThreshold(event)
         || isHistoricalTsFileEventCountExceededLimit(event)
         || isRealtimeTsFileEventCountExceededLimit(event)
-        || mayTsFileLinkedCountReachDangerousThreshold(event)
-        || mayInsertNodeMemoryReachDangerousThreshold(event);
+        || mayTsFileLinkedCountReachDangerousThreshold(event);
   }
 
   private boolean isPipeTaskCurrentlyRestarted(final PipeRealtimeEvent event) {
@@ -236,36 +320,24 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
         PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
     if (isPipeTaskCurrentlyRestarted && event.mayExtractorUseTablets(this)) {
       LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnyMore1: Pipe task is currently 
restarted",
+          "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(0): Pipe task is 
currently restarted",
           pipeName,
           dataRegionId);
     }
     return isPipeTaskCurrentlyRestarted;
   }
 
-  private boolean mayWalSizeReachThrottleThreshold(final PipeRealtimeEvent 
event) {
-    final boolean mayWalSizeReachThrottleThreshold =
-        3 * WALManager.getInstance().getTotalDiskUsage()
-            > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
-    if (mayWalSizeReachThrottleThreshold && 
event.mayExtractorUseTablets(this)) {
-      LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnyMore2: Wal size {} has reached 
throttle threshold {}",
-          pipeName,
-          dataRegionId,
-          WALManager.getInstance().getTotalDiskUsage(),
-          IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold() / 
3.0d);
-    }
-    return mayWalSizeReachThrottleThreshold;
-  }
-
   private boolean mayMemTablePinnedCountReachDangerousThreshold(final 
PipeRealtimeEvent event) {
+    if (PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount() == 
Integer.MAX_VALUE) {
+      return false;
+    }
     final boolean mayMemTablePinnedCountReachDangerousThreshold =
         PipeDataNodeResourceManager.wal().getPinnedWalCount()
             >= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount()
                 * StorageEngine.getInstance().getDataRegionNumber();
     if (mayMemTablePinnedCountReachDangerousThreshold && 
event.mayExtractorUseTablets(this)) {
       LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnyMore3: The number of pinned 
memtables {} has reached the dangerous threshold {}",
+          "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(1): The number of 
pinned memTables {} has reached the dangerous threshold {}",
           pipeName,
           dataRegionId,
           PipeDataNodeResourceManager.wal().getPinnedWalCount(),
@@ -276,6 +348,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   private boolean isHistoricalTsFileEventCountExceededLimit(final 
PipeRealtimeEvent event) {
+    if 
(PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion()
+        == Integer.MAX_VALUE) {
+      return false;
+    }
     final IoTDBDataRegionExtractor extractor =
         
PipeDataRegionExtractorMetrics.getInstance().getExtractorMap().get(getTaskID());
     final boolean isHistoricalTsFileEventCountExceededLimit =
@@ -284,7 +360,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
                 >= 
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
     if (isHistoricalTsFileEventCountExceededLimit && 
event.mayExtractorUseTablets(this)) {
       LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnyMore4: The number of historical 
tsFile events {} has exceeded the limit {}",
+          "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(2): The number of 
historical tsFile events {} has exceeded the limit {}",
           pipeName,
           dataRegionId,
           extractor.getHistoricalTsFileInsertionEventCount(),
@@ -294,12 +370,16 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
   }
 
   private boolean isRealtimeTsFileEventCountExceededLimit(final 
PipeRealtimeEvent event) {
+    if 
(PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion()
+        == Integer.MAX_VALUE) {
+      return false;
+    }
     final boolean isRealtimeTsFileEventCountExceededLimit =
         pendingQueue.getTsFileInsertionEventCount()
             >= 
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
     if (isRealtimeTsFileEventCountExceededLimit && 
event.mayExtractorUseTablets(this)) {
       LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnyMore5: The number of realtime 
tsFile events {} has exceeded the limit {}",
+          "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(3): The number of 
realtime tsFile events {} has exceeded the limit {}",
           pipeName,
           dataRegionId,
           pendingQueue.getTsFileInsertionEventCount(),
@@ -309,12 +389,15 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
   }
 
   private boolean mayTsFileLinkedCountReachDangerousThreshold(final 
PipeRealtimeEvent event) {
+    if (PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount() == 
Long.MAX_VALUE) {
+      return false;
+    }
     final boolean mayTsFileLinkedCountReachDangerousThreshold =
         PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount()
             >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
     if (mayTsFileLinkedCountReachDangerousThreshold && 
event.mayExtractorUseTablets(this)) {
       LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnyMore6: The number of linked 
tsfiles {} has reached the dangerous threshold {}",
+          "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(4): The number of 
linked tsFiles {} has reached the dangerous threshold {}",
           pipeName,
           dataRegionId,
           PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount(),
@@ -323,25 +406,6 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     return mayTsFileLinkedCountReachDangerousThreshold;
   }
 
-  private boolean mayInsertNodeMemoryReachDangerousThreshold(final 
PipeRealtimeEvent event) {
-    final long floatingMemoryUsageInByte =
-        PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
-    final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
-    final long totalFloatingMemorySizeInBytes =
-        
PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes();
-    final boolean mayInsertNodeMemoryReachDangerousThreshold =
-        3 * floatingMemoryUsageInByte * pipeCount >= 2 * 
totalFloatingMemorySizeInBytes;
-    if (mayInsertNodeMemoryReachDangerousThreshold && 
event.mayExtractorUseTablets(this)) {
-      LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnyMore7: The shallow memory usage 
of the insert node {} has reached the dangerous threshold {}",
-          pipeName,
-          dataRegionId,
-          floatingMemoryUsageInByte * pipeCount,
-          2 * totalFloatingMemorySizeInBytes / 3.0d);
-    }
-    return mayInsertNodeMemoryReachDangerousThreshold;
-  }
-
   @Override
   public Event supply() {
     PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) 
pendingQueue.directPoll();
@@ -387,13 +451,20 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
         .migrateState(
             this,
             state -> {
-              if (!state.equals(TsFileEpoch.State.EMPTY)) {
-                return state;
+              switch (state) {
+                case EMPTY:
+                  return canNotUseTabletAnyMore(event)
+                      ? TsFileEpoch.State.USING_TSFILE
+                      : TsFileEpoch.State.USING_TABLET;
+                case USING_TSFILE:
+                  return canNotUseTabletAnyMore(event)
+                      ? TsFileEpoch.State.USING_TSFILE
+                      : TsFileEpoch.State.USING_BOTH;
+                case USING_TABLET:
+                case USING_BOTH:
+                default:
+                  return state;
               }
-
-              return canNotUseTabletAnyMore(event)
-                  ? TsFileEpoch.State.USING_TSFILE
-                  : TsFileEpoch.State.USING_TABLET;
             });
 
     final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
index 18384fae48f..045f0201fcc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.metric.overview;
 
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
@@ -84,6 +85,31 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
         String.valueOf(operator.getCreationTime()));
   }
 
+  public boolean mayRemainingInsertEventExceedLimit(final String pipeID) {
+    if (Objects.isNull(metricService)) {
+      return true;
+    }
+
+    if (remainingEventAndTimeOperatorMap.values().stream()
+            
.map(PipeDataNodeRemainingEventAndTimeOperator::getRemainingInsertEventSmoothingCount)
+            .reduce(0d, Double::sum)
+        > 
PipeConfig.getInstance().getPipeMaxAllowedTotalRemainingInsertEventCount()) {
+      return true;
+    }
+
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        remainingEventAndTimeOperatorMap.get(pipeID);
+    if (Objects.isNull(operator)) {
+      LOGGER.warn(
+          "Failed to get remaining insert event, 
RemainingEventAndTimeOperator({}) does not exist, will degrade anyway",
+          pipeID);
+      return true;
+    }
+
+    return operator.getRemainingInsertEventSmoothingCount()
+        > 
PipeConfig.getInstance().getPipeMaxAllowedRemainingInsertEventCountPerPipe();
+  }
+
   @Override
   public void unbindFrom(final AbstractMetricService metricService) {
     
ImmutableSet.copyOf(remainingEventAndTimeOperatorMap.keySet()).forEach(this::deregister);
@@ -147,20 +173,36 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
     }
   }
 
-  public void increaseTabletEventCount(final String pipeName, final long 
creationTime) {
+  public void increaseInsertNodeEventCount(final String pipeName, final long 
creationTime) {
+    remainingEventAndTimeOperatorMap
+        .computeIfAbsent(
+            pipeName + "_" + creationTime,
+            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
+        .increaseInsertNodeEventCount();
+  }
+
+  public void decreaseInsertNodeEventCount(final String pipeName, final long 
creationTime) {
+    remainingEventAndTimeOperatorMap
+        .computeIfAbsent(
+            pipeName + "_" + creationTime,
+            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
+        .decreaseInsertNodeEventCount();
+  }
+
+  public void increaseRawTabletEventCount(final String pipeName, final long 
creationTime) {
     remainingEventAndTimeOperatorMap
         .computeIfAbsent(
             pipeName + "_" + creationTime,
             k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
-        .increaseTabletEventCount();
+        .increaseRawTabletEventCount();
   }
 
-  public void decreaseTabletEventCount(final String pipeName, final long 
creationTime) {
+  public void decreaseRawTabletEventCount(final String pipeName, final long 
creationTime) {
     remainingEventAndTimeOperatorMap
         .computeIfAbsent(
             pipeName + "_" + creationTime,
             k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
-        .decreaseTabletEventCount();
+        .decreaseRawTabletEventCount();
   }
 
   public void increaseTsFileEventCount(final String pipeName, final long 
creationTime) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 09f6823f03d..218448da0d6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -44,7 +44,8 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
   private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
       Collections.newSetFromMap(new ConcurrentHashMap<>());
 
-  private final AtomicInteger tabletEventCount = new AtomicInteger(0);
+  private final AtomicInteger insertNodeEventCount = new AtomicInteger(0);
+  private final AtomicInteger rawTabletEventCount = new AtomicInteger(0);
   private final AtomicInteger tsfileEventCount = new AtomicInteger(0);
   private final AtomicInteger heartbeatEventCount = new AtomicInteger(0);
 
@@ -53,6 +54,10 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
   private final IoTDBHistogram collectInvocationHistogram =
       (IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram();
 
+  private volatile long lastInsertNodeEventCountSmoothingTime = Long.MIN_VALUE;
+  private final Meter insertNodeEventCountMeter =
+      new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
+
   private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
   private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
 
@@ -62,12 +67,20 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
 
   //////////////////////////// Remaining event & time calculation 
////////////////////////////
 
-  void increaseTabletEventCount() {
-    tabletEventCount.incrementAndGet();
+  void increaseInsertNodeEventCount() {
+    insertNodeEventCount.incrementAndGet();
+  }
+
+  void decreaseInsertNodeEventCount() {
+    insertNodeEventCount.decrementAndGet();
+  }
+
+  void increaseRawTabletEventCount() {
+    rawTabletEventCount.incrementAndGet();
   }
 
-  void decreaseTabletEventCount() {
-    tabletEventCount.decrementAndGet();
+  void decreaseRawTabletEventCount() {
+    rawTabletEventCount.decrementAndGet();
   }
 
   void increaseTsFileEventCount() {
@@ -86,10 +99,22 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
     heartbeatEventCount.decrementAndGet();
   }
 
+  double getRemainingInsertEventSmoothingCount() {
+    if (System.currentTimeMillis() - lastInsertNodeEventCountSmoothingTime
+        >= 
PipeConfig.getInstance().getPipeRemainingInsertEventCountSmoothingIntervalSeconds())
 {
+      insertNodeEventCountMeter.mark(insertNodeEventCount.get());
+      lastInsertNodeEventCountSmoothingTime = System.currentTimeMillis();
+    }
+    return PipeConfig.getInstance()
+        .getPipeRemainingTimeCommitRateAverageTime()
+        .getMeterRate(insertNodeEventCountMeter);
+  }
+
   long getRemainingEvents() {
     final long remainingEvents =
         tsfileEventCount.get()
-            + tabletEventCount.get()
+            + rawTabletEventCount.get()
+            + insertNodeEventCount.get()
             + heartbeatEventCount.get()
             + schemaRegionExtractors.stream()
                 .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
@@ -116,7 +141,9 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
     final double invocationValue = collectInvocationHistogram.getMean();
     // Do not take heartbeat event into account
     final double totalDataRegionWriteEventCount =
-        tsfileEventCount.get() * Math.max(invocationValue, 1) + 
tabletEventCount.get();
+        tsfileEventCount.get() * Math.max(invocationValue, 1)
+            + rawTabletEventCount.get()
+            + insertNodeEventCount.get();
 
     dataRegionCommitMeter.updateAndGet(
         meter -> {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index a813cf216e1..b55ef0347f1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -265,10 +265,10 @@ public class CommonConfig {
   private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
   private double pipeReceiverActualToEstimatedMemoryRatio = 3;
 
-  private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
-  private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 5;
-  private int pipeMaxAllowedPinnedMemTableCount = 10; // per data region
-  private long pipeMaxAllowedLinkedTsFileCount = 300;
+  private int pipeMaxAllowedHistoricalTsFilePerDataRegion = Integer.MAX_VALUE; 
// Deprecated
+  private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 
Integer.MAX_VALUE; // Deprecated
+  private int pipeMaxAllowedPinnedMemTableCount = Integer.MAX_VALUE; // per 
data region
+  private long pipeMaxAllowedLinkedTsFileCount = Long.MAX_VALUE; // Deprecated
   private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
   private long pipeStuckRestartIntervalSeconds = 120;
   private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
@@ -276,6 +276,9 @@ public class CommonConfig {
   private long pipeFlushAfterLastTerminateSeconds = 30;
   private long pipeFlushAfterTerminateCount = 30;
   private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
+  private int pipeMaxAllowedRemainingInsertEventCountPerPipe = 10000;
+  private int pipeMaxAllowedTotalRemainingInsertEventCount = 50000;
+  private int pipeRemainingEventCountSmoothingIntervalSeconds = 15;
 
   private int pipeMetaReportMaxLogNumPerRound = 10;
   private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -1443,6 +1446,57 @@ public class CommonConfig {
     return pipeStorageEngineFlushTimeIntervalMs;
   }
 
+  public int getPipeMaxAllowedRemainingInsertEventCountPerPipe() {
+    return pipeMaxAllowedRemainingInsertEventCountPerPipe;
+  }
+
+  public void setPipeMaxAllowedRemainingInsertEventCountPerPipe(
+      int pipeMaxAllowedRemainingInsertEventCountPerPipe) {
+    if (this.pipeMaxAllowedRemainingInsertEventCountPerPipe
+        == pipeMaxAllowedRemainingInsertEventCountPerPipe) {
+      return;
+    }
+    this.pipeMaxAllowedRemainingInsertEventCountPerPipe =
+        pipeMaxAllowedRemainingInsertEventCountPerPipe;
+    logger.info(
+        "pipeMaxAllowedRemainingInsertEventCount is set to {}",
+        pipeMaxAllowedRemainingInsertEventCountPerPipe);
+  }
+
+  public int getPipeMaxAllowedTotalRemainingInsertEventCount() {
+    return pipeMaxAllowedTotalRemainingInsertEventCount;
+  }
+
+  public void setPipeMaxAllowedTotalRemainingInsertEventCount(
+      int pipeMaxAllowedTotalRemainingInsertEventCount) {
+    if (this.pipeMaxAllowedTotalRemainingInsertEventCount
+        == pipeMaxAllowedTotalRemainingInsertEventCount) {
+      return;
+    }
+    this.pipeMaxAllowedTotalRemainingInsertEventCount =
+        pipeMaxAllowedTotalRemainingInsertEventCount;
+    logger.info(
+        "pipeMaxAllowedTotalRemainingInsertEventCount is set to {}",
+        pipeMaxAllowedTotalRemainingInsertEventCount);
+  }
+
+  public int getPipeRemainingInsertEventCountSmoothingIntervalSeconds() {
+    return pipeRemainingEventCountSmoothingIntervalSeconds;
+  }
+
+  public void setPipeRemainingInsertEventCountSmoothingIntervalSeconds(
+      int pipeRemainingEventCountSmoothingIntervalSeconds) {
+    if (this.pipeRemainingEventCountSmoothingIntervalSeconds
+        == pipeRemainingEventCountSmoothingIntervalSeconds) {
+      return;
+    }
+    this.pipeRemainingEventCountSmoothingIntervalSeconds =
+        pipeRemainingEventCountSmoothingIntervalSeconds;
+    logger.info(
+        "pipeRemainingEventCountSmoothingIntervalSeconds is set to {}",
+        pipeRemainingEventCountSmoothingIntervalSeconds);
+  }
+
   public void setPipeStuckRestartIntervalSeconds(long 
pipeStuckRestartIntervalSeconds) {
     if (this.pipeStuckRestartIntervalSeconds == 
pipeStuckRestartIntervalSeconds) {
       return;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 9b63b46626c..851ed8c4b5d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -315,6 +315,18 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs();
   }
 
+  public int getPipeMaxAllowedRemainingInsertEventCountPerPipe() {
+    return COMMON_CONFIG.getPipeMaxAllowedRemainingInsertEventCountPerPipe();
+  }
+
+  public int getPipeMaxAllowedTotalRemainingInsertEventCount() {
+    return COMMON_CONFIG.getPipeMaxAllowedTotalRemainingInsertEventCount();
+  }
+
+  public int getPipeRemainingInsertEventCountSmoothingIntervalSeconds() {
+    return 
COMMON_CONFIG.getPipeRemainingInsertEventCountSmoothingIntervalSeconds();
+  }
+
   /////////////////////////////// Logger ///////////////////////////////
 
   public int getPipeMetaReportMaxLogNumPerRound() {
@@ -542,6 +554,15 @@ public class PipeConfig {
     LOGGER.info("PipeFlushAfterLastTerminateSeconds: {}", 
getPipeFlushAfterLastTerminateSeconds());
     LOGGER.info(
         "PipeStorageEngineFlushTimeIntervalMs: {}", 
getPipeStorageEngineFlushTimeIntervalMs());
+    LOGGER.info(
+        "PipeMaxAllowedRemainingInsertEventCountPerPipe: {}",
+        getPipeMaxAllowedRemainingInsertEventCountPerPipe());
+    LOGGER.info(
+        "PipeMaxAllowedTotalRemainingInsertEventCount: {}",
+        getPipeMaxAllowedTotalRemainingInsertEventCount());
+    LOGGER.info(
+        "PipeRemainingInsertEventCountSmoothingIntervalSeconds: {}",
+        getPipeRemainingInsertEventCountSmoothingIntervalSeconds());
 
     LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());
     LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", 
getPipeMetaReportMaxLogIntervalRounds());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 919df926189..f25c285ae5a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -426,6 +426,22 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_stuck_restart_interval_seconds",
                 String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));
+    config.setPipeMaxAllowedRemainingInsertEventCountPerPipe(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_max_allowed_remaining_insert_event_count_per_pipe",
+                
String.valueOf(config.getPipeMaxAllowedRemainingInsertEventCountPerPipe()))));
+    config.setPipeMaxAllowedTotalRemainingInsertEventCount(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_max_allowed_total_remaining_insert_event_count",
+                
String.valueOf(config.getPipeMaxAllowedTotalRemainingInsertEventCount()))));
+    config.setPipeRemainingInsertEventCountSmoothingIntervalSeconds(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_remaining_insert_event_count_smoothing_interval_seconds",
+                String.valueOf(
+                    
config.getPipeRemainingInsertEventCountSmoothingIntervalSeconds()))));
     config.setPipeStuckRestartMinIntervalMs(
         Long.parseLong(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
index 8e18fcfa546..49699fdf878 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
@@ -29,6 +29,8 @@ public class PipeLogManager {
 
   private final ConcurrentMap<Class<?>, PipeLogStatus> logClass2LogStatusMap =
       new ConcurrentHashMap<>();
+  private final ConcurrentMap<Class<?>, ConcurrentMap<Object, PipeLogStatus>>
+      logClass2Key2StatusMap = new ConcurrentHashMap<>();
 
   public Optional<Logger> schedule(
       final Class<?> logClass,
@@ -40,4 +42,16 @@ public class PipeLogManager {
             logClass, k -> new PipeLogStatus(logClass, maxAverageScale, 
maxLogInterval))
         .schedule(scale);
   }
+
+  public Optional<Logger> schedule(
+      final Class<?> logClass,
+      final Object key,
+      final int maxAverageScale,
+      final int maxLogInterval,
+      final int scale) {
+    return logClass2Key2StatusMap
+        .computeIfAbsent(logClass, k -> new ConcurrentHashMap<>())
+        .computeIfAbsent(key, k -> new PipeLogStatus(logClass, 
maxAverageScale, maxLogInterval))
+        .schedule(scale);
+  }
 }


Reply via email to