This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-flush by this push:
     new 590831ce426 partial
590831ce426 is described below

commit 590831ce42681fe177f3eff3dfc86f63f8502fc1
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 13 14:41:50 2026 +0800

    partial
---
 .../PipeRealtimeDataRegionHybridSource.java        |  2 ++
 .../PipeTsFileEpochProgressIndexKeeper.java        | 26 ++++++++++++++++++----
 .../apache/iotdb/commons/conf/CommonConfig.java    | 15 +++++++++++++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  4 ++++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  5 +++++
 5 files changed, 48 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 0721683f4d2..64fd0846465 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -176,6 +176,8 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
       case EMPTY:
       case USING_TSFILE:
       case USING_BOTH:
+        PipeTsFileEpochProgressIndexKeeper.getInstance()
+            .markAsExtracted(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
         if (!pendingQueue.waitedOffer(event)) {
           // This would not happen, but just in case.
           // pendingQueue is unbounded, so it should never reach capacity.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
index ff7d90c377d..fbb6502ab17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
@@ -22,6 +22,8 @@ package 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
+import org.apache.tsfile.utils.Pair;
+
 import javax.annotation.Nonnull;
 
 import java.util.Map;
@@ -32,15 +34,31 @@ import java.util.concurrent.ConcurrentHashMap;
 public class PipeTsFileEpochProgressIndexKeeper {
 
   // data region id -> pipeName -> tsFile path -> max progress index
-  private final Map<String, Map<String, Map<String, TsFileResource>>> 
progressIndexKeeper =
-      new ConcurrentHashMap<>();
+  private final Map<String, Map<String, Map<String, Pair<TsFileResource, 
Long>>>>
+      progressIndexKeeper = new ConcurrentHashMap<>();
 
   public synchronized void registerProgressIndex(
       final String dataRegionId, final String pipeName, final TsFileResource 
resource) {
     progressIndexKeeper
         .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
         .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
-        .putIfAbsent(resource.getTsFilePath(), resource);
+        .putIfAbsent(resource.getTsFilePath(), new Pair<>(resource, 
System.currentTimeMillis()));
+  }
+
+  public synchronized void markAsExtracted(
+      final String dataRegionId, final String pipeName, final String filePath) 
{
+    final Pair<TsFileResource, Long> pair =
+        progressIndexKeeper
+            .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+            .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
+            .get(filePath);
+    if (Objects.nonNull(pair)) {
+      pair.setRight(Long.MAX_VALUE);
+    }
+  }
+
+  public void flushAllTsFiles(final String dataRegionId) {
+
   }
 
   public synchronized void eliminateProgressIndex(
@@ -64,7 +82,7 @@ public class PipeTsFileEpochProgressIndexKeeper {
         .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
         .map(Entry::getValue)
         .filter(Objects::nonNull)
-        .anyMatch(resource -> 
!resource.getMaxProgressIndex().isAfter(progressIndex));
+        .anyMatch(resource -> 
!resource.getLeft().getMaxProgressIndex().isAfter(progressIndex));
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index cf68da89553..2e600029241 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -322,6 +322,9 @@ public class CommonConfig {
   private volatile int pipeTsFilePinMaxLogNumPerRound = 10;
   private volatile int pipeTsFilePinMaxLogIntervalRounds = 90;
 
+  // <= 0 means disabled
+  private volatile long pipeTsFileFlushIntervalSeconds = 5 * 60L;
+
   private volatile boolean pipeMemoryManagementEnabled = true;
   private volatile long pipeMemoryAllocateRetryIntervalMs = 50;
   private volatile int pipeMemoryAllocateMaxRetries = 10;
@@ -1783,6 +1786,18 @@ public class CommonConfig {
         "pipeTsFilePinMaxLogIntervalRounds is set to {}", 
pipeTsFilePinMaxLogIntervalRounds);
   }
 
+  public long getPipeTsFileFlushIntervalSeconds() {
+    return pipeTsFileFlushIntervalSeconds;
+  }
+
+  public void setPipeTsFileFlushIntervalSeconds(long 
pipeTsFileFlushIntervalSeconds) {
+    if (this.pipeTsFileFlushIntervalSeconds == pipeTsFileFlushIntervalSeconds) 
{
+      return;
+    }
+    this.pipeTsFileFlushIntervalSeconds = pipeTsFileFlushIntervalSeconds;
+    logger.info("pipeTsFileFlushIntervalSeconds is set to {}", 
pipeTsFileFlushIntervalSeconds);
+  }
+
   public boolean getPipeMemoryManagementEnabled() {
     return pipeMemoryManagementEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index a49caa53368..16539d82d8a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -387,6 +387,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds();
   }
 
+  public long getPipeTsFileFlushIntervalSeconds() {
+    return COMMON_CONFIG.getPipeTsFileFlushIntervalSeconds();
+  }
+
   public long getPipeLoggerCacheMaxSizeInBytes() {
     return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 832517b9745..6e72b4919fb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -131,6 +131,11 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_tsfile_pin_max_log_interval_rounds",
                 
String.valueOf(config.getPipeTsFilePinMaxLogIntervalRounds()))));
+    config.setPipeTsFileFlushIntervalSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_tsfile_flush_interval_seconds",
+                String.valueOf(config.getPipeTsFileFlushIntervalSeconds()))));
     config.setPipeMemoryManagementEnabled(
         Boolean.parseBoolean(
             properties.getProperty(

Reply via email to