This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7bbdc95f8442ff48337f7efe3ebc45861f651535
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 13 14:41:50 2026 +0800

    Flush manager
---
 .../tsfile/PipeCompactedTsFileInsertionEvent.java  |   8 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |   7 +-
 .../PipeRealtimeDataRegionHybridSource.java        |  13 ++-
 .../realtime/PipeRealtimeDataRegionSource.java     |  10 +-
 .../PipeRealtimeDataRegionTsFileSource.java        |  16 ++-
 ...ipeTsFileEpochProgressIndexAndFlushManager.java | 128 +++++++++++++++++++++
 .../PipeTsFileEpochProgressIndexKeeper.java        |  89 --------------
 .../listener/PipeTimePartitionListener.java        |  50 ++++----
 .../rescon/memory/TimePartitionManager.java        |   2 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  15 +++
 .../iotdb/commons/pipe/config/PipeConfig.java      |   4 +
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |   5 +
 12 files changed, 211 insertions(+), 136 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
index bf3f2c97acb..078ce718488 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
@@ -26,7 +26,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
 
 public class PipeCompactedTsFileInsertionEvent extends 
PipeTsFileInsertionEvent {
 
-  private final String dataRegionId;
+  private final int dataRegionId;
   private final Set<String> originFilePaths;
   private final List<Long> commitIds;
 
@@ -70,7 +70,7 @@ public class PipeCompactedTsFileInsertionEvent extends 
PipeTsFileInsertionEvent
         anyOfOriginalEvents.getStartTime(),
         anyOfOriginalEvents.getEndTime());
 
-    this.dataRegionId = String.valueOf(committerKey.getRegionId());
+    this.dataRegionId = committerKey.getRegionId();
     this.originFilePaths =
         originalEvents.stream()
             .map(PipeTsFileInsertionEvent::getTsFile)
@@ -186,7 +186,7 @@ public class PipeCompactedTsFileInsertionEvent extends 
PipeTsFileInsertionEvent
   public void eliminateProgressIndex() {
     if (Objects.isNull(overridingProgressIndex)) {
       for (final String originFilePath : originFilePaths) {
-        PipeTsFileEpochProgressIndexKeeper.getInstance()
+        PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
             .eliminateProgressIndex(dataRegionId, pipeName, originFilePath);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 6b3e505d2ea..7b07e084a7a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -45,7 +45,7 @@ import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
-import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -399,8 +399,9 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
 
   public void eliminateProgressIndex() {
     if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
-      PipeTsFileEpochProgressIndexKeeper.getInstance()
-          .eliminateProgressIndex(resource.getDataRegionId(), pipeName, 
resource.getTsFilePath());
+      PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
+          .eliminateProgressIndex(
+              Integer.parseInt(resource.getDataRegionId()), pipeName, 
resource.getTsFilePath());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 0721683f4d2..bb26b5e18da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -30,7 +30,7 @@ import 
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
 import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
 import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -65,6 +65,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
               "Unsupported event type %s for hybrid realtime extractor %s",
               eventToExtract.getClass(), this));
     }
+    
PipeTsFileEpochProgressIndexAndFlushManager.getInstance().flushAllTimeoutTsFiles();
   }
 
   @Override
@@ -82,8 +83,8 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
 
     if (canNotUseTabletAnymore(event)) {
       event.getTsFileEpoch().migrateState(this, curState -> 
TsFileEpoch.State.USING_TSFILE);
-      PipeTsFileEpochProgressIndexKeeper.getInstance()
-          .registerProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
+      PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
+          .registerResource(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
     } else {
       event
           .getTsFileEpoch()
@@ -169,13 +170,15 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
     switch (state) {
       case USING_TABLET:
         // If the state is USING_TABLET, discard the event
-        PipeTsFileEpochProgressIndexKeeper.getInstance()
+        PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
             .eliminateProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
         
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
 false);
         return;
       case EMPTY:
       case USING_TSFILE:
       case USING_BOTH:
+        PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
+            .markAsExtracted(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
         if (!pendingQueue.waitedOffer(event)) {
           // This would not happen, but just in case.
           // pendingQueue is unbounded, so it should never reach capacity.
@@ -310,7 +313,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
       LOGGER.error(errorMessage);
       PipeDataNodeAgent.runtime()
           .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-      PipeTsFileEpochProgressIndexKeeper.getInstance()
+      PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
           .eliminateProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
       return null;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index cd00e2975a0..3dec073f057 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -39,7 +39,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
-import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeTimePartitionListener;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -91,7 +91,7 @@ public abstract class PipeRealtimeDataRegionSource implements 
PipeExtractor {
 
   protected String pipeName;
   protected long creationTime;
-  protected String dataRegionId;
+  protected int dataRegionId;
   protected PipeTaskMeta pipeTaskMeta;
 
   protected boolean shouldExtractInsertion;
@@ -214,7 +214,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     shouldExtractDeletion = insertionDeletionListeningOptionPair.getRight();
 
     pipeName = environment.getPipeName();
-    dataRegionId = String.valueOf(environment.getRegionId());
+    dataRegionId = environment.getRegionId();
     pipeTaskMeta = environment.getPipeTaskMeta();
 
     // Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. 
These metrics are
@@ -555,7 +555,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     return skipIfNoPrivileges;
   }
 
-  public final String getDataRegionId() {
+  public final int getDataRegionId() {
     return dataRegionId;
   }
 
@@ -595,7 +595,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   }
 
   private void maySkipProgressIndexForRealtimeEvent(final PipeRealtimeEvent 
event) {
-    if (PipeTsFileEpochProgressIndexKeeper.getInstance()
+    if (PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
         .isProgressIndexAfterOrEquals(
             dataRegionId,
             pipeName,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index 8f74bc63fb3..be1f648d163 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
-import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
 import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -39,7 +39,9 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
       LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileSource.class);
 
   @Override
-  protected void doExtract(PipeRealtimeEvent event) {
+  protected void doExtract(final PipeRealtimeEvent event) {
+    
PipeTsFileEpochProgressIndexAndFlushManager.getInstance().flushAllTimeoutTsFiles();
+
     if (event.getEvent() instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
       return;
@@ -51,8 +53,8 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
     }
 
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
-    PipeTsFileEpochProgressIndexKeeper.getInstance()
-        .registerProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
+    PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
+        .registerResource(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
 
     if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
       
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
 false);
@@ -64,7 +66,7 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
       // Pending is unbounded, so it should never reach capacity.
       final String errorMessage =
           String.format(
-              "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor 
%s "
+              "extract: pending queue of PipeRealtimeDataRegionTsFileSource %s 
"
                   + "has reached capacity, discard TsFile event %s, current 
state %s",
               this, event, event.getTsFileEpoch().getState(this));
       LOGGER.error(errorMessage);
@@ -75,6 +77,8 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
       
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
 false);
     }
 
+    PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
+        .markAsExtracted(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
     event.getTsFileEpoch().clearState(this);
   }
 
@@ -116,7 +120,7 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
         LOGGER.error(errorMessage);
         PipeDataNodeAgent.runtime()
             .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-        PipeTsFileEpochProgressIndexKeeper.getInstance()
+        PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
             .eliminateProgressIndex(
                 dataRegionId, pipeName, 
realtimeEvent.getTsFileEpoch().getFilePath());
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
new file mode 100644
index 00000000000..20b2dfe410f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.utils.Pair;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeTsFileEpochProgressIndexAndFlushManager {
+
+  // data region id -> pipeName -> tsFile path -> max progress index
+  private final Map<Integer, Map<String, Map<String, Pair<TsFileResource, 
Long>>>>
+      progressIndexKeeper = new ConcurrentHashMap<>();
+
+  public synchronized void registerResource(
+      final int dataRegionId, final String pipeName, final TsFileResource 
resource) {
+    progressIndexKeeper
+        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+        .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
+        .putIfAbsent(resource.getTsFilePath(), new Pair<>(resource, 
System.currentTimeMillis()));
+  }
+
+  public synchronized void markAsExtracted(
+      final int dataRegionId, final String pipeName, final String filePath) {
+    final Pair<TsFileResource, Long> pair =
+        progressIndexKeeper
+            .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+            .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
+            .get(filePath);
+    if (Objects.nonNull(pair)) {
+      pair.setRight(Long.MAX_VALUE);
+    }
+  }
+
+  public void flushAllTimeoutTsFiles() {
+    progressIndexKeeper.forEach(
+        (regionId, map) ->
+            map.values()
+                .forEach(
+                    fileMap ->
+                        fileMap.forEach(
+                            (path, pair) -> {
+                              if (System.currentTimeMillis()
+                                      - 
PipeConfig.getInstance().getPipeTsFileFlushIntervalSeconds()
+                                          * 1000L
+                                  >= pair.getRight()) {
+                                StorageEngine.getInstance()
+                                    .getDataRegion(new DataRegionId(regionId))
+                                    .asyncCloseOneTsFileProcessor(
+                                        pair.getLeft().isSeq(), 
pair.getLeft().getProcessor());
+                                pair.setRight(Long.MAX_VALUE);
+                              }
+                            })));
+  }
+
+  public synchronized void eliminateProgressIndex(
+      final int dataRegionId, final @Nonnull String pipeName, final String 
filePath) {
+    progressIndexKeeper
+        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+        .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
+        .remove(filePath);
+  }
+
+  public synchronized boolean isProgressIndexAfterOrEquals(
+      final int dataRegionId,
+      final String pipeName,
+      final String tsFilePath,
+      final ProgressIndex progressIndex) {
+    return progressIndexKeeper
+        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+        .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
+        .entrySet()
+        .stream()
+        .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
+        .map(Entry::getValue)
+        .filter(Objects::nonNull)
+        .anyMatch(resource -> 
!resource.getLeft().getMaxProgressIndex().isAfter(progressIndex));
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeTimePartitionProgressIndexKeeperHolder {
+
+    private static final PipeTsFileEpochProgressIndexAndFlushManager INSTANCE =
+        new PipeTsFileEpochProgressIndexAndFlushManager();
+
+    private PipeTimePartitionProgressIndexKeeperHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeTsFileEpochProgressIndexAndFlushManager getInstance() {
+    return 
PipeTsFileEpochProgressIndexAndFlushManager.PipeTimePartitionProgressIndexKeeperHolder
+        .INSTANCE;
+  }
+
+  private PipeTsFileEpochProgressIndexAndFlushManager() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
deleted file mode 100644
index ff7d90c377d..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
-
-import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-
-import javax.annotation.Nonnull;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class PipeTsFileEpochProgressIndexKeeper {
-
-  // data region id -> pipeName -> tsFile path -> max progress index
-  private final Map<String, Map<String, Map<String, TsFileResource>>> 
progressIndexKeeper =
-      new ConcurrentHashMap<>();
-
-  public synchronized void registerProgressIndex(
-      final String dataRegionId, final String pipeName, final TsFileResource 
resource) {
-    progressIndexKeeper
-        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
-        .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
-        .putIfAbsent(resource.getTsFilePath(), resource);
-  }
-
-  public synchronized void eliminateProgressIndex(
-      final String dataRegionId, final @Nonnull String pipeName, final String 
filePath) {
-    progressIndexKeeper
-        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
-        .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
-        .remove(filePath);
-  }
-
-  public synchronized boolean isProgressIndexAfterOrEquals(
-      final String dataRegionId,
-      final String pipeName,
-      final String tsFilePath,
-      final ProgressIndex progressIndex) {
-    return progressIndexKeeper
-        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
-        .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
-        .entrySet()
-        .stream()
-        .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
-        .map(Entry::getValue)
-        .filter(Objects::nonNull)
-        .anyMatch(resource -> 
!resource.getMaxProgressIndex().isAfter(progressIndex));
-  }
-
-  //////////////////////////// singleton ////////////////////////////
-
-  private static class PipeTimePartitionProgressIndexKeeperHolder {
-
-    private static final PipeTsFileEpochProgressIndexKeeper INSTANCE =
-        new PipeTsFileEpochProgressIndexKeeper();
-
-    private PipeTimePartitionProgressIndexKeeperHolder() {
-      // empty constructor
-    }
-  }
-
-  public static PipeTsFileEpochProgressIndexKeeper getInstance() {
-    return 
PipeTsFileEpochProgressIndexKeeper.PipeTimePartitionProgressIndexKeeperHolder.INSTANCE;
-  }
-
-  private PipeTsFileEpochProgressIndexKeeper() {
-    // empty constructor
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
index c5e98cd1b2c..d8fd295c9ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
@@ -30,47 +30,50 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class PipeTimePartitionListener {
 
-  private final Map<String, Map<String, PipeRealtimeDataRegionSource>> 
dataRegionId2Extractors =
+  private final Map<Integer, Map<String, PipeRealtimeDataRegionSource>> 
dataRegionId2Sources =
       new ConcurrentHashMap<>();
 
   // This variable is used to record the upper and lower bounds that each data 
region's time
   // partition ID has ever reached.
-  private final Map<String, Pair<Long, Long>> 
dataRegionId2TimePartitionIdBound =
+  private final Map<Integer, Pair<Long, Long>> 
dataRegionId2TimePartitionIdBound =
       new ConcurrentHashMap<>();
 
   //////////////////////////// start & stop ////////////////////////////
 
   public synchronized void startListen(
-      String dataRegionId, PipeRealtimeDataRegionSource extractor) {
-    dataRegionId2Extractors
+      final int dataRegionId, final PipeRealtimeDataRegionSource source) {
+    dataRegionId2Sources
         .computeIfAbsent(dataRegionId, o -> new HashMap<>())
-        .put(extractor.getTaskID(), extractor);
-    // Assign the previously recorded upper and lower bounds of time partition 
to the extractor that
+        .put(source.getTaskID(), source);
+    // Assign the previously recorded upper and lower bounds of time partition 
to the source that
     // has just started listening to the growth of time partition.
-    Pair<Long, Long> timePartitionIdBound = 
dataRegionId2TimePartitionIdBound.get(dataRegionId);
+    final Pair<Long, Long> timePartitionIdBound =
+        dataRegionId2TimePartitionIdBound.get(dataRegionId);
     if (Objects.nonNull(timePartitionIdBound)) {
-      extractor.setDataRegionTimePartitionIdBound(timePartitionIdBound);
+      source.setDataRegionTimePartitionIdBound(timePartitionIdBound);
     }
   }
 
-  public synchronized void stopListen(String dataRegionId, 
PipeRealtimeDataRegionSource extractor) {
-    Map<String, PipeRealtimeDataRegionSource> extractors =
-        dataRegionId2Extractors.get(dataRegionId);
-    if (Objects.isNull(extractors)) {
+  public synchronized void stopListen(
+      final int dataRegionId, final PipeRealtimeDataRegionSource source) {
+    final Map<String, PipeRealtimeDataRegionSource> sources =
+        dataRegionId2Sources.get(dataRegionId);
+    if (Objects.isNull(sources)) {
       return;
     }
-    extractors.remove(extractor.getTaskID());
-    if (extractors.isEmpty()) {
-      dataRegionId2Extractors.remove(dataRegionId);
+    sources.remove(source.getTaskID());
+    if (sources.isEmpty()) {
+      dataRegionId2Sources.remove(dataRegionId);
     }
   }
 
   //////////////////////////// listen to changes ////////////////////////////
 
   public synchronized void listenToTimePartitionGrow(
-      String dataRegionId, Pair<Long, Long> newTimePartitionIdBound) {
+      final int dataRegionId, final Pair<Long, Long> newTimePartitionIdBound) {
     boolean shouldBroadcastTimePartitionChange = false;
-    Pair<Long, Long> oldTimePartitionIdBound = 
dataRegionId2TimePartitionIdBound.get(dataRegionId);
+    final Pair<Long, Long> oldTimePartitionIdBound =
+        dataRegionId2TimePartitionIdBound.get(dataRegionId);
 
     if (Objects.isNull(oldTimePartitionIdBound)) {
       dataRegionId2TimePartitionIdBound.put(dataRegionId, 
newTimePartitionIdBound);
@@ -86,14 +89,15 @@ public class PipeTimePartitionListener {
     }
 
     if (shouldBroadcastTimePartitionChange) {
-      Map<String, PipeRealtimeDataRegionSource> extractors =
-          dataRegionId2Extractors.get(dataRegionId);
-      if (Objects.isNull(extractors)) {
+      final Map<String, PipeRealtimeDataRegionSource> sources =
+          dataRegionId2Sources.get(dataRegionId);
+      if (Objects.isNull(sources)) {
         return;
       }
-      Pair<Long, Long> timePartitionIdBound = 
dataRegionId2TimePartitionIdBound.get(dataRegionId);
-      extractors.forEach(
-          (id, extractor) -> 
extractor.setDataRegionTimePartitionIdBound(timePartitionIdBound));
+      final Pair<Long, Long> timePartitionIdBound =
+          dataRegionId2TimePartitionIdBound.get(dataRegionId);
+      sources.forEach(
+          (id, source) -> 
source.setDataRegionTimePartitionIdBound(timePartitionIdBound));
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
index 2e1161977f8..5ded82a36ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
@@ -67,7 +67,7 @@ public class TimePartitionManager {
       // PipeInsertionDataNodeListener.listenToInsertNode.
       PipeTimePartitionListener.getInstance()
           .listenToTimePartitionGrow(
-              String.valueOf(timePartitionInfo.dataRegionId.getId()),
+              timePartitionInfo.dataRegionId.getId(),
               new Pair<>(
                   timePartitionInfoMapForRegion.firstKey(),
                   timePartitionInfoMapForRegion.lastKey()));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index cf68da89553..2e600029241 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -322,6 +322,9 @@ public class CommonConfig {
   private volatile int pipeTsFilePinMaxLogNumPerRound = 10;
   private volatile int pipeTsFilePinMaxLogIntervalRounds = 90;
 
+  // <= 0 means disabled
+  private volatile long pipeTsFileFlushIntervalSeconds = 5 * 60L;
+
   private volatile boolean pipeMemoryManagementEnabled = true;
   private volatile long pipeMemoryAllocateRetryIntervalMs = 50;
   private volatile int pipeMemoryAllocateMaxRetries = 10;
@@ -1783,6 +1786,18 @@ public class CommonConfig {
         "pipeTsFilePinMaxLogIntervalRounds is set to {}", 
pipeTsFilePinMaxLogIntervalRounds);
   }
 
+  public long getPipeTsFileFlushIntervalSeconds() {
+    return pipeTsFileFlushIntervalSeconds;
+  }
+
+  public void setPipeTsFileFlushIntervalSeconds(long 
pipeTsFileFlushIntervalSeconds) {
+    if (this.pipeTsFileFlushIntervalSeconds == pipeTsFileFlushIntervalSeconds) 
{
+      return;
+    }
+    this.pipeTsFileFlushIntervalSeconds = pipeTsFileFlushIntervalSeconds;
+    logger.info("pipeTsFileFlushIntervalSeconds is set to {}", 
pipeTsFileFlushIntervalSeconds);
+  }
+
   public boolean getPipeMemoryManagementEnabled() {
     return pipeMemoryManagementEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index a49caa53368..16539d82d8a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -387,6 +387,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds();
   }
 
+  public long getPipeTsFileFlushIntervalSeconds() {
+    return COMMON_CONFIG.getPipeTsFileFlushIntervalSeconds();
+  }
+
   public long getPipeLoggerCacheMaxSizeInBytes() {
     return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 832517b9745..6e72b4919fb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -131,6 +131,11 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_tsfile_pin_max_log_interval_rounds",
                 
String.valueOf(config.getPipeTsFilePinMaxLogIntervalRounds()))));
+    config.setPipeTsFileFlushIntervalSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_tsfile_flush_interval_seconds",
+                String.valueOf(config.getPipeTsFileFlushIntervalSeconds()))));
     config.setPipeMemoryManagementEnabled(
         Boolean.parseBoolean(
             properties.getProperty(

Reply via email to