This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch optimize-pipe-after-stop
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 22ed3442c0c97eb9cc8f369071eb362049c5dd75
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Mar 10 12:36:37 2025 +0800

    op
---
 .../realtime/PipeRealtimeDataRegionHybridExtractor.java        |  7 +++++++
 .../storageengine/dataregion/wal/utils/WALInsertNodeCache.java |  7 +++++--
 .../main/java/org/apache/iotdb/commons/conf/CommonConfig.java  | 10 ++++++++++
 .../java/org/apache/iotdb/commons/conf/CommonDescriptor.java   |  5 +++++
 .../java/org/apache/iotdb/commons/pipe/config/PipeConfig.java  |  7 +++++++
 5 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 26cc157e7ee..b1baca7c42a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -48,6 +48,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
 
+  private final boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled =
+      PipeConfig.getInstance().isPipeEpochKeepTsFileAfterStuckRestartEnabled();
+
   @Override
   protected void doExtract(final PipeRealtimeEvent event) {
     final Event eventToExtract = event.getEvent();
@@ -223,6 +226,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   private boolean isPipeTaskCurrentlyRestarted(final PipeRealtimeEvent event) {
+    if (!isPipeEpochKeepTsFileAfterStuckRestartEnabled) {
+      return false;
+    }
+
     final boolean isPipeTaskCurrentlyRestarted =
         PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
     if (isPipeTaskCurrentlyRestarted && event.mayExtractorUseTablets(this)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 76857c04983..0541ff96188 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -58,6 +59,7 @@ public class WALInsertNodeCache {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WALInsertNodeCache.class);
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
 
   private final PipeMemoryBlock allocatedMemoryBlock;
   // Used to adjust the memory usage of the cache
@@ -75,8 +77,9 @@ public class WALInsertNodeCache {
     final long requestedAllocateSize =
         (long)
             Math.min(
-                (double) 2 * CONFIG.getWalFileSizeThresholdInByte(),
-                CONFIG.getAllocateMemoryForPipe() * 0.8 / 5);
+                (double) PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
+                    * CONFIG.getWalFileSizeThresholdInByte(),
+                CONFIG.getAllocateMemoryForPipe() * 0.45);
     allocatedMemoryBlock =
         PipeDataNodeResourceManager.memory()
             .tryAllocate(requestedAllocateSize)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index e3bf41954eb..340e7e26462 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -265,6 +265,7 @@ public class CommonConfig {
   private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
   private long pipeStuckRestartIntervalSeconds = 120;
   private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
+  private boolean pipeEpochKeepTsFileAfterStuckRestartEnabled = false;
   private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
 
   private int pipeMetaReportMaxLogNumPerRound = 10;
@@ -1088,6 +1089,10 @@ public class CommonConfig {
     return pipeStuckRestartMinIntervalMs;
   }
 
+  public boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled() {
+    return pipeEpochKeepTsFileAfterStuckRestartEnabled;
+  }
+
   public long getPipeStorageEngineFlushTimeIntervalMs() {
     return pipeStorageEngineFlushTimeIntervalMs;
   }
@@ -1100,6 +1105,11 @@ public class CommonConfig {
     this.pipeStuckRestartMinIntervalMs = pipeStuckRestartMinIntervalMs;
   }
 
+  public void setPipeEpochKeepTsFileAfterStuckRestartEnabled(
+      boolean pipeEpochKeepTsFileAfterStuckRestartEnabled) {
+    this.pipeEpochKeepTsFileAfterStuckRestartEnabled = 
pipeEpochKeepTsFileAfterStuckRestartEnabled;
+  }
+
   public void setPipeStorageEngineFlushTimeIntervalMs(long 
pipeStorageEngineFlushTimeIntervalMs) {
     this.pipeStorageEngineFlushTimeIntervalMs = 
pipeStorageEngineFlushTimeIntervalMs;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 4b813672669..10b6978149c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -540,6 +540,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_stuck_restart_min_interval_ms",
                 String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));
+    config.setPipeEpochKeepTsFileAfterStuckRestartEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_epoch_keep_tsfile_after_stuck_restart_enabled",
+                
String.valueOf(config.isPipeEpochKeepTsFileAfterStuckRestartEnabled()))));
     config.setPipeStorageEngineFlushTimeIntervalMs(
         Long.parseLong(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 8cd4b4d78d5..c0cc1a4d9fb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -268,6 +268,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeStuckRestartMinIntervalMs();
   }
 
+  public boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled() {
+    return COMMON_CONFIG.isPipeEpochKeepTsFileAfterStuckRestartEnabled();
+  }
+
   public long getPipeStorageEngineFlushTimeIntervalMs() {
     return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs();
   }
@@ -475,6 +479,9 @@ public class PipeConfig {
         getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage());
     LOGGER.info("PipeStuckRestartIntervalSeconds: {}", 
getPipeStuckRestartIntervalSeconds());
     LOGGER.info("PipeStuckRestartMinIntervalMs: {}", 
getPipeStuckRestartMinIntervalMs());
+    LOGGER.info(
+        "PipeEpochKeepTsFileAfterStuckRestartEnabled: {}",
+        isPipeEpochKeepTsFileAfterStuckRestartEnabled());
     LOGGER.info(
         "PipeStorageEngineFlushTimeIntervalMs: {}", 
getPipeStorageEngineFlushTimeIntervalMs());
 

Reply via email to