This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit adef295839ab38c1c10705d7049ebeb01a05417d
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jul 25 17:03:03 2025 +0800

    Pipe: Simplified the hybrid down-grading logic (#16033)
    
    * simplify
    
    * comp
    
    (cherry picked from commit 2a8192a428da3516e44078566f750cdf52f88ff5)
---
 .../PipeRealtimeDataRegionHybridExtractor.java     | 231 ++++-----------------
 .../apache/iotdb/commons/conf/CommonConfig.java    |  49 -----
 .../iotdb/commons/pipe/config/PipeConfig.java      |  22 --
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  16 --
 4 files changed, 40 insertions(+), 278 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index f1ddff90c67..516ab4d05ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -20,19 +20,16 @@
 package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
-import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
 import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
-import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -41,7 +38,6 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Objects;
 import java.util.Optional;
 
 public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegionExtractor {
@@ -82,7 +78,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   private void extractTabletInsertion(final PipeRealtimeEvent event) {
     TsFileEpoch.State state;
 
-    if (canNotUseTabletAnyMore(event)) {
+    if (canNotUseTabletAnymore(event)) {
       event.getTsFileEpoch().migrateState(this, curState -> 
TsFileEpoch.State.USING_TSFILE);
       PipeTsFileEpochProgressIndexKeeper.getInstance()
           .registerProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
@@ -162,7 +158,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
                   return TsFileEpoch.State.USING_TSFILE;
                 case USING_BOTH:
                 default:
-                  return canNotUseTabletAnyMore(event)
+                  return canNotUseTabletAnymore(event)
                       ? TsFileEpoch.State.USING_TSFILE
                       : TsFileEpoch.State.USING_BOTH;
               }
@@ -171,9 +167,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
     switch (state) {
       case USING_TABLET:
-        // Though the data in tsfile event has been extracted in tablet mode, 
we still need to
-        // extract the tsfile event to help to determine 
isTsFileEventCountInQueueExceededLimit().
-        // The extracted tsfile event will be discarded in 
supplyTsFileInsertion.
+        // If the state is USING_TABLET, discard the event
+        PipeTsFileEpochProgressIndexKeeper.getInstance()
+            .eliminateProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
+        return;
       case EMPTY:
       case USING_TSFILE:
       case USING_BOTH:
@@ -202,17 +199,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     }
   }
 
-  private boolean canNotUseTabletAnyMore(final PipeRealtimeEvent event) {
-    // In the following 4 cases, we should not extract this tablet event. all 
the data
-    // represented by the tablet event should be carried by the following 
tsfile event:
-    //  the write operation will be throttled, so we should not extract any 
more tablet events.
-    //  1. The shallow memory usage of the insert node has reached the 
dangerous threshold.
-    //  2. Deprecated logics (unused by default)
-    return mayInsertNodeMemoryReachDangerousThreshold(event)
-        || canNotUseTabletAnymoreDeprecated(event);
-  }
-
-  private boolean mayInsertNodeMemoryReachDangerousThreshold(final 
PipeRealtimeEvent event) {
+  // If the insertNode's memory has reached the dangerous threshold, we should 
not extract any
+  // tablets.
+  private boolean canNotUseTabletAnymore(final PipeRealtimeEvent event) {
     final long floatingMemoryUsageInByte =
         PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
     final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
@@ -224,7 +213,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
       final PipeDataNodeRemainingEventAndTimeOperator operator =
           
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
       LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory 
usage of the insert node {} has reached the dangerous threshold of single pipe 
{}, event count: {}",
+          "Pipe task {}@{} canNotUseTabletAnyMore for tsFile {}: The memory 
usage of the insert node {} has reached the dangerous threshold of single pipe 
{}, event count: {}",
           pipeName,
           dataRegionId,
           event.getTsFileEpoch().getFilePath(),
@@ -237,83 +226,6 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     return mayInsertNodeMemoryReachDangerousThreshold;
   }
 
-  /**
-   * These judgements are deprecated, and are only reserved for manual 
operation and compatibility.
-   */
-  @Deprecated
-  private boolean canNotUseTabletAnymoreDeprecated(final PipeRealtimeEvent 
event) {
-    // In the following 5 cases, we should not extract any more tablet events. 
all the data
-    // represented by the tablet events should be carried by the following 
tsfile event:
-    //  1. The number of historical tsFile events to transfer has exceeded the 
limit.
-    //  2. The number of realtime tsfile events to transfer has exceeded the 
limit.
-    //  3. The number of linked tsFiles has reached the dangerous threshold.
-    return isHistoricalTsFileEventCountExceededLimit(event)
-        || isRealtimeTsFileEventCountExceededLimit(event)
-        || mayTsFileLinkedCountReachDangerousThreshold(event);
-  }
-
-  private boolean isHistoricalTsFileEventCountExceededLimit(final 
PipeRealtimeEvent event) {
-    if 
(PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion()
-        == Integer.MAX_VALUE) {
-      return false;
-    }
-    final IoTDBDataRegionExtractor extractor =
-        
PipeDataRegionExtractorMetrics.getInstance().getExtractorMap().get(getTaskID());
-    final boolean isHistoricalTsFileEventCountExceededLimit =
-        Objects.nonNull(extractor)
-            && extractor.getHistoricalTsFileInsertionEventCount()
-                >= 
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
-    if (isHistoricalTsFileEventCountExceededLimit && 
event.mayExtractorUseTablets(this)) {
-      LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(1) for tsFile {}: 
The number of historical tsFile events {} has exceeded the limit {}",
-          pipeName,
-          dataRegionId,
-          event.getTsFileEpoch().getFilePath(),
-          extractor.getHistoricalTsFileInsertionEventCount(),
-          
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion());
-    }
-    return isHistoricalTsFileEventCountExceededLimit;
-  }
-
-  private boolean isRealtimeTsFileEventCountExceededLimit(final 
PipeRealtimeEvent event) {
-    if 
(PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion()
-        == Integer.MAX_VALUE) {
-      return false;
-    }
-    final boolean isRealtimeTsFileEventCountExceededLimit =
-        pendingQueue.getTsFileInsertionEventCount()
-            >= 
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
-    if (isRealtimeTsFileEventCountExceededLimit && 
event.mayExtractorUseTablets(this)) {
-      LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(2) for tsFile {}: 
The number of realtime tsFile events {} has exceeded the limit {}",
-          pipeName,
-          dataRegionId,
-          event.getTsFileEpoch().getFilePath(),
-          pendingQueue.getTsFileInsertionEventCount(),
-          
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
-    }
-    return isRealtimeTsFileEventCountExceededLimit;
-  }
-
-  private boolean mayTsFileLinkedCountReachDangerousThreshold(final 
PipeRealtimeEvent event) {
-    if (PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount() == 
Long.MAX_VALUE) {
-      return false;
-    }
-    final boolean mayTsFileLinkedCountReachDangerousThreshold =
-        PipeDataNodeResourceManager.tsfile().getLinkedTsFileCount(pipeName)
-            >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
-    if (mayTsFileLinkedCountReachDangerousThreshold && 
event.mayExtractorUseTablets(this)) {
-      LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(3) for tsFile {}: 
The number of linked tsFiles {} has reached the dangerous threshold {}",
-          pipeName,
-          dataRegionId,
-          event.getTsFileEpoch().getFilePath(),
-          PipeDataNodeResourceManager.tsfile().getLinkedTsFileCount(pipeName),
-          PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount());
-    }
-    return mayTsFileLinkedCountReachDangerousThreshold;
-  }
-
   @Override
   public Event supply() {
     PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) 
pendingQueue.directPoll();
@@ -355,103 +267,40 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
   }
 
   private Event supplyTabletInsertion(final PipeRealtimeEvent event) {
-    event
-        .getTsFileEpoch()
-        .migrateState(
-            this,
-            state -> {
-              switch (state) {
-                case EMPTY:
-                  return canNotUseTabletAnyMore(event)
-                      ? TsFileEpoch.State.USING_TSFILE
-                      : TsFileEpoch.State.USING_TABLET;
-                case USING_TSFILE:
-                  return canNotUseTabletAnyMore(event)
-                      ? TsFileEpoch.State.USING_TSFILE
-                      : TsFileEpoch.State.USING_BOTH;
-                case USING_TABLET:
-                case USING_BOTH:
-                default:
-                  return state;
-              }
-            });
-
-    final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
-    if (state == TsFileEpoch.State.USING_TSFILE) {
-      PipeTsFileEpochProgressIndexKeeper.getInstance()
-          .registerProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
-    }
-
-    switch (state) {
-      case USING_TSFILE:
-        // If the state is USING_TSFILE, discard the event and poll the next 
one.
-        return null;
-      case EMPTY:
-      case USING_TABLET:
-      case USING_BOTH:
-      default:
-        if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
 {
-          return event.getEvent();
-        } else {
-          // If the event's reference count can not be increased, it means the 
data represented by
-          // this event is not reliable anymore. but the data represented by 
this event
-          // has been carried by the following tsfile event, so we can just 
discard this event.
-          event.getTsFileEpoch().migrateState(this, s -> 
TsFileEpoch.State.USING_BOTH);
-          LOGGER.warn(
-              "Discard tablet event {} because it is not reliable anymore. "
-                  + "Change the state of TsFileEpoch to USING_TSFILE.",
-              event);
-          return null;
-        }
+    if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
 {
+      return event.getEvent();
+    } else {
+      // If the event's reference count can not be increased, it means the 
data represented by
+      // this event is not reliable anymore. but the data represented by this 
event
+      // has been carried by the following tsfile event, so we can just 
discard this event.
+      event.getTsFileEpoch().migrateState(this, s -> 
TsFileEpoch.State.USING_BOTH);
+      LOGGER.warn(
+          "Discard tablet event {} because it is not reliable anymore. "
+              + "Change the state of TsFileEpoch to USING_BOTH.",
+          event);
+      return null;
     }
   }
 
   private Event supplyTsFileInsertion(final PipeRealtimeEvent event) {
-    event
-        .getTsFileEpoch()
-        .migrateState(
-            this,
-            state -> {
-              // This would not happen, but just in case.
-              if (state.equals(TsFileEpoch.State.EMPTY)) {
-                LOGGER.error(
-                    String.format("EMPTY TsFileEpoch when supplying TsFile 
Event %s", event));
-                return TsFileEpoch.State.USING_TSFILE;
-              }
-              return state;
-            });
-
-    final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
-    switch (state) {
-      case USING_TABLET:
-        // If the state is USING_TABLET, discard the event and poll the next 
one.
-        PipeTsFileEpochProgressIndexKeeper.getInstance()
-            .eliminateProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
-        return null;
-      case EMPTY:
-      case USING_TSFILE:
-      case USING_BOTH:
-      default:
-        if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
 {
-          return event.getEvent();
-        } else {
-          // If the event's reference count can not be increased, it means the 
data represented by
-          // this event is not reliable anymore. the data has been lost. we 
simply discard this
-          // event
-          // and report the exception to PipeRuntimeAgent.
-          final String errorMessage =
-              String.format(
-                  "TsFile Event %s can not be supplied because "
-                      + "the reference count can not be increased, "
-                      + "the data represented by this event is lost",
-                  event.getEvent());
-          LOGGER.error(errorMessage);
-          PipeDataNodeAgent.runtime()
-              .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-          PipeTsFileEpochProgressIndexKeeper.getInstance()
-              .eliminateProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
-          return null;
-        }
+    if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
 {
+      return event.getEvent();
+    } else {
+      // If the event's reference count can not be increased, it means the 
data represented by
+      // this event is not reliable anymore. the data has been lost. we simply 
discard this
+      // event and report the exception to PipeRuntimeAgent.
+      final String errorMessage =
+          String.format(
+              "TsFile Event %s can not be supplied because "
+                  + "the reference count can not be increased, "
+                  + "the data represented by this event is lost",
+              event.getEvent());
+      LOGGER.error(errorMessage);
+      PipeDataNodeAgent.runtime()
+          .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
+      PipeTsFileEpochProgressIndexKeeper.getInstance()
+          .eliminateProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
+      return null;
     }
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index a2bd1ee0f28..f9697bd585f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -295,10 +295,6 @@ public class CommonConfig {
 
   private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
 
-  private int pipeMaxAllowedHistoricalTsFilePerDataRegion = Integer.MAX_VALUE; 
// Deprecated
-  private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 
Integer.MAX_VALUE; // Deprecated
-  private long pipeMaxAllowedLinkedTsFileCount = Long.MAX_VALUE; // Deprecated
-
   private double pipeMetaReportMaxLogNumPerRound = 0.1;
   private int pipeMetaReportMaxLogIntervalRounds = 360;
   private int pipeTsFilePinMaxLogNumPerRound = 10;
@@ -1499,51 +1495,6 @@ public class CommonConfig {
     return pipeReceiverReqDecompressedMaxLengthInBytes;
   }
 
-  public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
-    return pipeMaxAllowedHistoricalTsFilePerDataRegion;
-  }
-
-  public void setPipeMaxAllowedHistoricalTsFilePerDataRegion(
-      int pipeMaxAllowedPendingTsFileEpochPerDataRegion) {
-    if (this.pipeMaxAllowedHistoricalTsFilePerDataRegion
-        == pipeMaxAllowedPendingTsFileEpochPerDataRegion) {
-      return;
-    }
-    this.pipeMaxAllowedHistoricalTsFilePerDataRegion =
-        pipeMaxAllowedPendingTsFileEpochPerDataRegion;
-    logger.info(
-        "pipeMaxAllowedHistoricalTsFilePerDataRegion is set to {}",
-        pipeMaxAllowedPendingTsFileEpochPerDataRegion);
-  }
-
-  public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
-    return pipeMaxAllowedPendingTsFileEpochPerDataRegion;
-  }
-
-  public void setPipeMaxAllowedPendingTsFileEpochPerDataRegion(
-      int pipeExtractorPendingQueueTsfileLimit) {
-    if (this.pipeMaxAllowedPendingTsFileEpochPerDataRegion
-        == pipeExtractorPendingQueueTsfileLimit) {
-      return;
-    }
-    this.pipeMaxAllowedPendingTsFileEpochPerDataRegion = 
pipeExtractorPendingQueueTsfileLimit;
-    logger.info(
-        "pipeMaxAllowedPendingTsFileEpochPerDataRegion is set to {}.",
-        pipeMaxAllowedPendingTsFileEpochPerDataRegion);
-  }
-
-  public long getPipeMaxAllowedLinkedTsFileCount() {
-    return pipeMaxAllowedLinkedTsFileCount;
-  }
-
-  public void setPipeMaxAllowedLinkedTsFileCount(long 
pipeMaxAllowedLinkedTsFileCount) {
-    if (this.pipeMaxAllowedLinkedTsFileCount == 
pipeMaxAllowedLinkedTsFileCount) {
-      return;
-    }
-    this.pipeMaxAllowedLinkedTsFileCount = pipeMaxAllowedLinkedTsFileCount;
-    logger.info("pipeMaxAllowedLinkedTsFileCount is set to {}", 
pipeMaxAllowedLinkedTsFileCount);
-  }
-
   public double getPipeMetaReportMaxLogNumPerRound() {
     return pipeMetaReportMaxLogNumPerRound;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index ca99d29a1d0..770e0e959b2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -343,20 +343,6 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
   }
 
-  /////////////////////////////// Hybrid Mode ///////////////////////////////
-
-  public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
-    return COMMON_CONFIG.getPipeMaxAllowedHistoricalTsFilePerDataRegion();
-  }
-
-  public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
-    return COMMON_CONFIG.getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
-  }
-
-  public long getPipeMaxAllowedLinkedTsFileCount() {
-    return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount();
-  }
-
   /////////////////////////////// Logger ///////////////////////////////
 
   public double getPipeMetaReportMaxLogNumPerRound() {
@@ -588,14 +574,6 @@ public class PipeConfig {
         "PipeReceiverReqDecompressedMaxLengthInBytes: {}",
         getPipeReceiverReqDecompressedMaxLengthInBytes());
 
-    LOGGER.info(
-        "PipeMaxAllowedHistoricalTsFilePerDataRegion: {}",
-        getPipeMaxAllowedHistoricalTsFilePerDataRegion());
-    LOGGER.info(
-        "PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}",
-        getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
-    LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", 
getPipeMaxAllowedLinkedTsFileCount());
-
     LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());
     LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", 
getPipeMetaReportMaxLogIntervalRounds());
     LOGGER.info("PipeTsFilePinMaxLogNumPerRound: {}", 
getPipeTsFilePinMaxLogNumPerRound());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 3e31864e3d8..7e27838618b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -433,22 +433,6 @@ public class PipeDescriptor {
                 "pipe_receiver_req_decompressed_max_length_in_bytes",
                 
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));
 
-    config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
-        Integer.parseInt(
-            properties.getProperty(
-                "pipe_max_allowed_historical_tsfile_per_data_region",
-                
String.valueOf(config.getPipeMaxAllowedHistoricalTsFilePerDataRegion()))));
-    config.setPipeMaxAllowedPendingTsFileEpochPerDataRegion(
-        Integer.parseInt(
-            properties.getProperty(
-                "pipe_max_allowed_pending_tsfile_epoch_per_data_region",
-                
String.valueOf(config.getPipeMaxAllowedPendingTsFileEpochPerDataRegion()))));
-    config.setPipeMaxAllowedLinkedTsFileCount(
-        Long.parseLong(
-            properties.getProperty(
-                "pipe_max_allowed_linked_tsfile_count",
-                String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount()))));
-
     config.setPipeMemoryAllocateMaxRetries(
         Integer.parseInt(
             properties.getProperty(

Reply via email to