This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cb13ed1613 Pipe: timely restart pipes according to the deletion of 
original TsFiles (#12406)
2cb13ed1613 is described below

commit 2cb13ed1613708cef728550cc4959b8ef785071a
Author: Zikun Ma <[email protected]>
AuthorDate: Tue May 14 14:40:51 2024 +0800

    Pipe: timely restart pipes according to the deletion of original TsFiles 
(#12406)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 55 +++++++++++++++++++---
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  4 +-
 .../pipe/resource/tsfile/PipeTsFileResource.java   | 33 +++++++++++--
 .../resource/tsfile/PipeTsFileResourceManager.java | 35 ++++++++++++--
 .../db/storageengine/dataregion/DataRegion.java    |  2 +-
 .../resource/PipeTsFileResourceManagerTest.java    | 16 +++----
 .../apache/iotdb/commons/conf/CommonConfig.java    | 11 +++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  5 ++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  7 +++
 9 files changed, 143 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 327fe6190d6..550514af081 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -50,6 +52,8 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSc
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.SystemMetric;
 import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
@@ -224,8 +228,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   }
 
   public void stopAllPipesWithCriticalException() {
-    super.stopAllPipesWithCriticalException(
-        IoTDBDescriptor.getInstance().getConfig().getDataNodeId());
+    super.stopAllPipesWithCriticalException(CONFIG.getDataNodeId());
   }
 
   ///////////////////////// Heartbeat /////////////////////////
@@ -324,14 +327,30 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
           taskId2ExtractorMap.values().stream()
               .filter(e -> e.getPipeName().equals(pipeName))
               .collect(Collectors.toList());
-      if (extractors.isEmpty()
-          || !extractors.get(0).isStreamMode()
+
+      if (extractors.isEmpty()) {
+        continue;
+      }
+
+      if (!extractors.get(0).isStreamMode()
           || extractors.stream()
               
.noneMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)) {
+        // Extractors of this pipe might not pin too much MemTables,
+        // still need to check if linked-and-deleted TsFile count exceeds 
limit.
+        if ((CONFIG.isEnableSeqSpaceCompaction()
+                || CONFIG.isEnableUnseqSpaceCompaction()
+                || CONFIG.isEnableCrossSpaceCompaction())
+            && mayDeletedTsFileSizeReachDangerousThreshold()) {
+          LOGGER.warn(
+              "Pipe {} needs to restart because too many TsFiles are 
out-of-date.",
+              pipeMeta.getStaticMeta());
+          stuckPipes.add(pipeMeta);
+        }
         continue;
       }
 
       if (mayMemTablePinnedCountReachDangerousThreshold() || 
mayWalSizeReachThrottleThreshold()) {
+        // Extractors of this pipe may be stuck and pinning too much MemTables.
         LOGGER.warn("Pipe {} may be stuck.", pipeMeta.getStaticMeta());
         stuckPipes.add(pipeMeta);
       }
@@ -341,14 +360,38 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     stuckPipes.parallelStream().forEach(this::restartStuckPipe);
   }
 
+  private boolean mayDeletedTsFileSizeReachDangerousThreshold() {
+    try {
+      final long linkedButDeletedTsFileSize =
+          PipeResourceManager.tsfile().getTotalLinkedButDeletedTsfileSize();
+      final double totalDisk =
+          MetricService.getInstance()
+              .getAutoGauge(
+                  SystemMetric.SYS_DISK_TOTAL_SPACE.toString(),
+                  MetricLevel.CORE,
+                  Tag.NAME.toString(),
+                  // This "system" should stay the same with the one in
+                  // DataNodeInternalRPCServiceImpl.
+                  "system")
+              .getValue();
+      return linkedButDeletedTsFileSize > 0
+          && totalDisk > 0
+          && linkedButDeletedTsFileSize
+              > 
PipeConfig.getInstance().getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage()
+                  * totalDisk;
+    } catch (Exception e) {
+      LOGGER.warn("Failed to judge if deleted TsFile size reaches dangerous 
threshold.", e);
+      return false;
+    }
+  }
+
   private boolean mayMemTablePinnedCountReachDangerousThreshold() {
     return PipeResourceManager.wal().getPinnedWalCount()
         >= 10 * 
PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
   }
 
   private boolean mayWalSizeReachThrottleThreshold() {
-    return 3 * WALManager.getInstance().getTotalDiskUsage()
-        > 2 * IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
+    return 3 * WALManager.getInstance().getTotalDiskUsage() > 2 * 
CONFIG.getThrottleThreshold();
   }
 
   private void restartStuckPipe(PipeMeta pipeMeta) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 3d951d0e394..cdab8d42215 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -178,9 +178,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   @Override
   public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
     try {
-      tsFile = PipeResourceManager.tsfile().increaseFileReference(tsFile, 
true);
+      tsFile = PipeResourceManager.tsfile().increaseFileReference(tsFile, 
true, resource);
       if (isWithMod) {
-        modFile = PipeResourceManager.tsfile().increaseFileReference(modFile, 
false);
+        modFile = PipeResourceManager.tsfile().increaseFileReference(modFile, 
false, null);
       }
       return true;
     } catch (Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index d0f54ed934b..b49ca5de4e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
@@ -38,6 +40,7 @@ import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -45,22 +48,29 @@ public class PipeTsFileResource implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileResource.class);
 
+  public static final long TSFILE_MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
+  private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.5f;
+
   private final File hardlinkOrCopiedFile;
   private final boolean isTsFile;
 
-  public static final long TSFILE_MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
+  /** this TsFileResource is used to track the {@link TsFileResourceStatus} of 
original TsFile. * */
+  private final TsFileResource tsFileResource;
+
+  private volatile long fileSize = -1L;
+
   private final AtomicInteger referenceCount;
   private final AtomicLong lastUnpinToZeroTime;
-
-  private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.5f;
   private PipeMemoryBlock allocatedMemoryBlock;
   private Map<IDeviceID, List<String>> deviceMeasurementsMap = null;
   private Map<IDeviceID, Boolean> deviceIsAlignedMap = null;
   private Map<String, TSDataType> measurementDataTypeMap = null;
 
-  public PipeTsFileResource(File hardlinkOrCopiedFile, boolean isTsFile) {
+  public PipeTsFileResource(
+      File hardlinkOrCopiedFile, boolean isTsFile, TsFileResource 
tsFileResource) {
     this.hardlinkOrCopiedFile = hardlinkOrCopiedFile;
     this.isTsFile = isTsFile;
+    this.tsFileResource = tsFileResource;
 
     referenceCount = new AtomicInteger(1);
     lastUnpinToZeroTime = new AtomicLong(Long.MAX_VALUE);
@@ -70,6 +80,21 @@ public class PipeTsFileResource implements AutoCloseable {
     return hardlinkOrCopiedFile;
   }
 
+  public boolean isOriginalTsFileDeleted() {
+    return isTsFile && Objects.nonNull(tsFileResource) && 
tsFileResource.isDeleted();
+  }
+
+  public long getFileSize() {
+    if (fileSize == -1L) {
+      synchronized (this) {
+        if (fileSize == -1L) {
+          fileSize = hardlinkOrCopiedFile.length();
+        }
+      }
+    }
+    return fileSize;
+  }
+
   ///////////////////// Reference Count /////////////////////
 
   public int getReferenceCount() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 3b564c00ff7..0d064cd8b06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -124,10 +124,13 @@ public class PipeTsFileResourceManager {
    * @param file tsfile, resource file or mod file. can be original file or 
hardlink/copy of
    *     original file
    * @param isTsFile {@code true} to create hardlink, {@code false} to copy 
file
+   * @param tsFileResource the TsFileResource of original TsFile. Ignored if 
{@param isTsFile} is
+   *     {@code false}.
    * @return the hardlink or copied file
    * @throws IOException when create hardlink or copy file failed
    */
-  public File increaseFileReference(File file, boolean isTsFile) throws 
IOException {
+  public File increaseFileReference(File file, boolean isTsFile, 
TsFileResource tsFileResource)
+      throws IOException {
     lock.lock();
     try {
       // If the file is already a hardlink or copied file,
@@ -155,7 +158,7 @@ public class PipeTsFileResourceManager {
       // file in pipe dir, create a hardlink or copy it to pipe dir, maintain 
a reference count for
       // the hardlink or copied file, and return the hardlink or copied file.
       hardlinkOrCopiedFileToPipeTsFileResourceMap.put(
-          resultFile.getPath(), new PipeTsFileResource(resultFile, isTsFile));
+          resultFile.getPath(), new PipeTsFileResource(resultFile, isTsFile, 
tsFileResource));
       return resultFile;
     } finally {
       lock.unlock();
@@ -301,9 +304,9 @@ public class PipeTsFileResourceManager {
   public void pinTsFileResource(TsFileResource resource, boolean withMods) 
throws IOException {
     lock.lock();
     try {
-      increaseFileReference(resource.getTsFile(), true);
+      increaseFileReference(resource.getTsFile(), true, resource);
       if (withMods && resource.getModFile().exists()) {
-        increaseFileReference(new File(resource.getModFile().getFilePath()), 
false);
+        increaseFileReference(new File(resource.getModFile().getFilePath()), 
false, null);
       }
     } finally {
       lock.unlock();
@@ -333,4 +336,28 @@ public class PipeTsFileResourceManager {
       lock.unlock();
     }
   }
+
+  /**
+   * Get the total size of linked TsFiles whose original TsFile is deleted (by 
compaction or else)
+   */
+  public long getTotalLinkedButDeletedTsfileSize() {
+    lock.lock();
+    try {
+      return 
hardlinkOrCopiedFileToPipeTsFileResourceMap.values().parallelStream()
+          .filter(PipeTsFileResource::isOriginalTsFileDeleted)
+          .mapToLong(
+              resource -> {
+                try {
+                  return resource.getFileSize();
+                } catch (Exception e) {
+                  LOGGER.warn(
+                      "failed to get file size of linked but deleted TsFile 
{}: ", resource, e);
+                  return 0;
+                }
+              })
+          .sum();
+    } finally {
+      lock.unlock();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 18dcadffc07..8a7f9ff663b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2891,7 +2891,7 @@ public class DataRegion implements IDataRegionForQuery {
     }
 
     // help tsfile resource degrade
-    
TsFileResourceManager.getInstance().registerSealedTsFileResource(tsFileResource);
+    tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
 
     tsFileManager.add(tsFileResource, false);
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
index 37c25d39556..be5e2c3a633 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
@@ -162,8 +162,8 @@ public class PipeTsFileResourceManagerTest {
     Assert.assertEquals(0, 
pipeTsFileResourceManager.getFileReferenceCount(originTsfile));
     Assert.assertEquals(0, 
pipeTsFileResourceManager.getFileReferenceCount(originModFile));
 
-    File pipeTsfile = 
pipeTsFileResourceManager.increaseFileReference(originTsfile, true);
-    File pipeModFile = 
pipeTsFileResourceManager.increaseFileReference(originModFile, false);
+    File pipeTsfile = 
pipeTsFileResourceManager.increaseFileReference(originTsfile, true, null);
+    File pipeModFile = 
pipeTsFileResourceManager.increaseFileReference(originModFile, false, null);
     Assert.assertEquals(1, 
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
     Assert.assertEquals(1, 
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertTrue(Files.exists(originTsfile.toPath()));
@@ -171,19 +171,19 @@ public class PipeTsFileResourceManagerTest {
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
     Assert.assertTrue(Files.exists(pipeModFile.toPath()));
 
-    pipeTsFileResourceManager.increaseFileReference(originTsfile, true);
-    pipeTsFileResourceManager.increaseFileReference(originModFile, false);
+    pipeTsFileResourceManager.increaseFileReference(originTsfile, true, null);
+    pipeTsFileResourceManager.increaseFileReference(originModFile, false, 
null);
     Assert.assertEquals(2, 
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
     Assert.assertEquals(2, 
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
 
     // test use hardlinkTsFile to increase reference counts
-    pipeTsFileResourceManager.increaseFileReference(pipeTsfile, true);
+    pipeTsFileResourceManager.increaseFileReference(pipeTsfile, true, null);
     Assert.assertEquals(3, 
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
     Assert.assertTrue(Files.exists(originTsfile.toPath()));
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
 
     // test use copyFile to increase reference counts
-    pipeTsFileResourceManager.increaseFileReference(pipeModFile, false);
+    pipeTsFileResourceManager.increaseFileReference(pipeModFile, false, null);
     Assert.assertEquals(3, 
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertTrue(Files.exists(originModFile.toPath()));
     Assert.assertTrue(Files.exists(pipeModFile.toPath()));
@@ -199,8 +199,8 @@ public class PipeTsFileResourceManagerTest {
     Assert.assertEquals(0, 
pipeTsFileResourceManager.getFileReferenceCount(originFile));
     Assert.assertEquals(0, 
pipeTsFileResourceManager.getFileReferenceCount(originModFile));
 
-    File pipeTsfile = 
pipeTsFileResourceManager.increaseFileReference(originFile, true);
-    File pipeModFile = 
pipeTsFileResourceManager.increaseFileReference(originModFile, false);
+    File pipeTsfile = 
pipeTsFileResourceManager.increaseFileReference(originFile, true, null);
+    File pipeModFile = 
pipeTsFileResourceManager.increaseFileReference(originModFile, false, null);
     Assert.assertEquals(1, 
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
     Assert.assertEquals(1, 
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 86014cd7b6c..8cc7ff7eefa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -211,6 +211,7 @@ public class CommonConfig {
   private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
   private int pipeMaxAllowedPinnedMemTableCount = 50;
   private long pipeMaxAllowedLinkedTsFileCount = 100;
+  private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
   private long pipeStuckRestartIntervalSeconds = 120;
 
   private int pipeMetaReportMaxLogNumPerRound = 10;
@@ -866,6 +867,16 @@ public class CommonConfig {
     this.pipeMaxAllowedLinkedTsFileCount = pipeMaxAllowedLinkedTsFileCount;
   }
 
+  public float getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage() {
+    return pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage;
+  }
+
+  public void setPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage(
+      float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage) {
+    this.pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage =
+        pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage;
+  }
+
   public long getPipeStuckRestartIntervalSeconds() {
     return pipeStuckRestartIntervalSeconds;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 7c9f9c813ee..9d0bd3cc2c0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -445,6 +445,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_max_allowed_linked_tsfile_count",
                 String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount()))));
+    config.setPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage(
+        Float.parseFloat(
+            properties.getProperty(
+                "pipe_max_allowed_linked_deleted_tsfile_disk_usage_percentage",
+                
String.valueOf(config.getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage()))));
     config.setPipeStuckRestartIntervalSeconds(
         Long.parseLong(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 7303659d51b..351f4da40ad 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -193,6 +193,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount();
   }
 
+  public float getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage() {
+    return 
COMMON_CONFIG.getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage();
+  }
+
   public long getPipeStuckRestartIntervalSeconds() {
     return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
   }
@@ -341,6 +345,9 @@ public class PipeConfig {
         getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
     LOGGER.info("PipeMaxAllowedPinnedMemTableCount: {}", 
getPipeMaxAllowedPinnedMemTableCount());
     LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", 
getPipeMaxAllowedLinkedTsFileCount());
+    LOGGER.info(
+        "PipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage: {}",
+        getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage());
     LOGGER.info("PipeStuckRestartIntervalSeconds: {}", 
getPipeStuckRestartIntervalSeconds());
 
     LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());

Reply via email to