This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-flush by this push:
     new dea7feb59eb may-fix
dea7feb59eb is described below

commit dea7feb59eb7923629683f033f5d09e9409665fb
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 13 16:23:50 2026 +0800

    may-fix
---
 .../dataregion/realtime/PipeRealtimeDataRegionHybridSource.java     | 2 +-
 .../dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java     | 6 +++++-
 .../assigner/PipeTsFileEpochProgressIndexAndFlushManager.java       | 2 +-
 3 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 5bf5e3fd4a5..bb26b5e18da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -84,7 +84,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
     if (canNotUseTabletAnymore(event)) {
       event.getTsFileEpoch().migrateState(this, curState -> 
TsFileEpoch.State.USING_TSFILE);
       PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
-          .registerProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
+          .registerResource(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
     } else {
       event
           .getTsFileEpoch()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index f6685ced073..7c9d5149817 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -40,6 +40,8 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
 
   @Override
   protected void doExtract(PipeRealtimeEvent event) {
+    
PipeTsFileEpochProgressIndexAndFlushManager.getInstance().flushAllTimeoutTsFiles();
+
     if (event.getEvent() instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
       return;
@@ -52,7 +54,7 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
 
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
     PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
-        .registerProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
+        .registerResource(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
 
     if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
       
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
 false);
@@ -75,6 +77,8 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
       
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
 false);
     }
 
+    PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
+        .markAsExtracted(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
     event.getTsFileEpoch().clearState(this);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
index 677fc92710a..20b2dfe410f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
@@ -40,7 +40,7 @@ public class PipeTsFileEpochProgressIndexAndFlushManager {
   private final Map<Integer, Map<String, Map<String, Pair<TsFileResource, 
Long>>>>
       progressIndexKeeper = new ConcurrentHashMap<>();
 
-  public synchronized void registerProgressIndex(
+  public synchronized void registerResource(
       final int dataRegionId, final String pipeName, final TsFileResource 
resource) {
     progressIndexKeeper
         .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())

Reply via email to