This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-flush by this push:
     new 6ee24e24dff refactor
6ee24e24dff is described below

commit 6ee24e24dfffbb3e82b078fc2744555d0155798f
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 13 16:05:53 2026 +0800

    refactor
---
 .../tsfile/PipeCompactedTsFileInsertionEvent.java  |  8 ++--
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  7 +--
 .../PipeRealtimeDataRegionHybridSource.java        | 10 ++---
 .../realtime/PipeRealtimeDataRegionSource.java     | 10 ++---
 .../PipeRealtimeDataRegionTsFileSource.java        |  6 +--
 ...peTsFileEpochProgressIndexAndFlushManager.java} | 46 ++++++++++++++------
 .../listener/PipeTimePartitionListener.java        | 50 ++++++++++++----------
 .../rescon/memory/TimePartitionManager.java        |  2 +-
 8 files changed, 82 insertions(+), 57 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
index bf3f2c97acb..078ce718488 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
@@ -26,7 +26,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
 
 public class PipeCompactedTsFileInsertionEvent extends 
PipeTsFileInsertionEvent {
 
-  private final String dataRegionId;
+  private final int dataRegionId;
   private final Set<String> originFilePaths;
   private final List<Long> commitIds;
 
@@ -70,7 +70,7 @@ public class PipeCompactedTsFileInsertionEvent extends 
PipeTsFileInsertionEvent
         anyOfOriginalEvents.getStartTime(),
         anyOfOriginalEvents.getEndTime());
 
-    this.dataRegionId = String.valueOf(committerKey.getRegionId());
+    this.dataRegionId = committerKey.getRegionId();
     this.originFilePaths =
         originalEvents.stream()
             .map(PipeTsFileInsertionEvent::getTsFile)
@@ -186,7 +186,7 @@ public class PipeCompactedTsFileInsertionEvent extends 
PipeTsFileInsertionEvent
   public void eliminateProgressIndex() {
     if (Objects.isNull(overridingProgressIndex)) {
       for (final String originFilePath : originFilePaths) {
-        PipeTsFileEpochProgressIndexKeeper.getInstance()
+        PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
             .eliminateProgressIndex(dataRegionId, pipeName, originFilePath);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 6b3e505d2ea..7b07e084a7a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -45,7 +45,7 @@ import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
-import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -399,8 +399,9 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
 
   public void eliminateProgressIndex() {
     if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
-      PipeTsFileEpochProgressIndexKeeper.getInstance()
-          .eliminateProgressIndex(resource.getDataRegionId(), pipeName, 
resource.getTsFilePath());
+      PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
+          .eliminateProgressIndex(
+              Integer.parseInt(resource.getDataRegionId()), pipeName, 
resource.getTsFilePath());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 64fd0846465..f3457ac79f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -30,7 +30,7 @@ import 
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
 import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
 import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -82,7 +82,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
 
     if (canNotUseTabletAnymore(event)) {
       event.getTsFileEpoch().migrateState(this, curState -> 
TsFileEpoch.State.USING_TSFILE);
-      PipeTsFileEpochProgressIndexKeeper.getInstance()
+      PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
           .registerProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
     } else {
       event
@@ -169,14 +169,14 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
     switch (state) {
       case USING_TABLET:
         // If the state is USING_TABLET, discard the event
-        PipeTsFileEpochProgressIndexKeeper.getInstance()
+        PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
             .eliminateProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
         
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
 false);
         return;
       case EMPTY:
       case USING_TSFILE:
       case USING_BOTH:
-        PipeTsFileEpochProgressIndexKeeper.getInstance()
+        PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
             .markAsExtracted(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
         if (!pendingQueue.waitedOffer(event)) {
           // This would not happen, but just in case.
@@ -312,7 +312,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
       LOGGER.error(errorMessage);
       PipeDataNodeAgent.runtime()
           .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-      PipeTsFileEpochProgressIndexKeeper.getInstance()
+      PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
           .eliminateProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
       return null;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index cd00e2975a0..3dec073f057 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -39,7 +39,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
-import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeTimePartitionListener;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -91,7 +91,7 @@ public abstract class PipeRealtimeDataRegionSource implements 
PipeExtractor {
 
   protected String pipeName;
   protected long creationTime;
-  protected String dataRegionId;
+  protected int dataRegionId;
   protected PipeTaskMeta pipeTaskMeta;
 
   protected boolean shouldExtractInsertion;
@@ -214,7 +214,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     shouldExtractDeletion = insertionDeletionListeningOptionPair.getRight();
 
     pipeName = environment.getPipeName();
-    dataRegionId = String.valueOf(environment.getRegionId());
+    dataRegionId = environment.getRegionId();
     pipeTaskMeta = environment.getPipeTaskMeta();
 
     // Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. 
These metrics are
@@ -555,7 +555,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     return skipIfNoPrivileges;
   }
 
-  public final String getDataRegionId() {
+  public final int getDataRegionId() {
     return dataRegionId;
   }
 
@@ -595,7 +595,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   }
 
   private void maySkipProgressIndexForRealtimeEvent(final PipeRealtimeEvent 
event) {
-    if (PipeTsFileEpochProgressIndexKeeper.getInstance()
+    if (PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
         .isProgressIndexAfterOrEquals(
             dataRegionId,
             pipeName,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index 8f74bc63fb3..f6685ced073 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
-import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
 import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -51,7 +51,7 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
     }
 
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
-    PipeTsFileEpochProgressIndexKeeper.getInstance()
+    PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
         .registerProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
 
     if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
@@ -116,7 +116,7 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
         LOGGER.error(errorMessage);
         PipeDataNodeAgent.runtime()
             .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-        PipeTsFileEpochProgressIndexKeeper.getInstance()
+        PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
             .eliminateProgressIndex(
                 dataRegionId, pipeName, 
realtimeEvent.getTsFileEpoch().getFilePath());
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
similarity index 62%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
index fbb6502ab17..27d549daf61 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
@@ -19,7 +19,10 @@
 
 package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
 
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import org.apache.tsfile.utils.Pair;
@@ -31,14 +34,14 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class PipeTsFileEpochProgressIndexKeeper {
+public class PipeTsFileEpochProgressIndexAndFlushManager {
 
   // data region id -> pipeName -> tsFile path -> max progress index
-  private final Map<String, Map<String, Map<String, Pair<TsFileResource, 
Long>>>>
+  private final Map<Integer, Map<String, Map<String, Pair<TsFileResource, 
Long>>>>
       progressIndexKeeper = new ConcurrentHashMap<>();
 
   public synchronized void registerProgressIndex(
-      final String dataRegionId, final String pipeName, final TsFileResource 
resource) {
+      final int dataRegionId, final String pipeName, final TsFileResource 
resource) {
     progressIndexKeeper
         .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
         .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
@@ -46,7 +49,7 @@ public class PipeTsFileEpochProgressIndexKeeper {
   }
 
   public synchronized void markAsExtracted(
-      final String dataRegionId, final String pipeName, final String filePath) 
{
+      final int dataRegionId, final String pipeName, final String filePath) {
     final Pair<TsFileResource, Long> pair =
         progressIndexKeeper
             .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
@@ -57,12 +60,28 @@ public class PipeTsFileEpochProgressIndexKeeper {
     }
   }
 
-  public void flushAllTsFiles(final String dataRegionId) {
-
+  public void flushAllTimeoutTsFiles() {
+    progressIndexKeeper.forEach(
+        (regionId, map) ->
+            map.values()
+                .forEach(
+                    fileMap ->
+                        fileMap.forEach(
+                            (path, pair) -> {
+                              if (System.currentTimeMillis()
+                                      - 
PipeConfig.getInstance().getPipeTsFileFlushIntervalSeconds()
+                                          * 1000L
+                                  >= pair.getRight()) {
+                                StorageEngine.getInstance()
+                                    .getDataRegion(new DataRegionId(regionId))
+                                    .asyncCloseOneTsFileProcessor(
+                                        pair.getLeft().isSeq(), 
pair.getLeft().getProcessor());
+                              }
+                            })));
   }
 
   public synchronized void eliminateProgressIndex(
-      final String dataRegionId, final @Nonnull String pipeName, final String 
filePath) {
+      final int dataRegionId, final @Nonnull String pipeName, final String 
filePath) {
     progressIndexKeeper
         .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
         .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
@@ -70,7 +89,7 @@ public class PipeTsFileEpochProgressIndexKeeper {
   }
 
   public synchronized boolean isProgressIndexAfterOrEquals(
-      final String dataRegionId,
+      final int dataRegionId,
       final String pipeName,
       final String tsFilePath,
       final ProgressIndex progressIndex) {
@@ -89,19 +108,20 @@ public class PipeTsFileEpochProgressIndexKeeper {
 
   private static class PipeTimePartitionProgressIndexKeeperHolder {
 
-    private static final PipeTsFileEpochProgressIndexKeeper INSTANCE =
-        new PipeTsFileEpochProgressIndexKeeper();
+    private static final PipeTsFileEpochProgressIndexAndFlushManager INSTANCE =
+        new PipeTsFileEpochProgressIndexAndFlushManager();
 
     private PipeTimePartitionProgressIndexKeeperHolder() {
       // empty constructor
     }
   }
 
-  public static PipeTsFileEpochProgressIndexKeeper getInstance() {
-    return 
PipeTsFileEpochProgressIndexKeeper.PipeTimePartitionProgressIndexKeeperHolder.INSTANCE;
+  public static PipeTsFileEpochProgressIndexAndFlushManager getInstance() {
+    return 
PipeTsFileEpochProgressIndexAndFlushManager.PipeTimePartitionProgressIndexKeeperHolder
+        .INSTANCE;
   }
 
-  private PipeTsFileEpochProgressIndexKeeper() {
+  private PipeTsFileEpochProgressIndexAndFlushManager() {
     // empty constructor
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
index c5e98cd1b2c..d8fd295c9ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
@@ -30,47 +30,50 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class PipeTimePartitionListener {
 
-  private final Map<String, Map<String, PipeRealtimeDataRegionSource>> 
dataRegionId2Extractors =
+  private final Map<Integer, Map<String, PipeRealtimeDataRegionSource>> 
dataRegionId2Sources =
       new ConcurrentHashMap<>();
 
   // This variable is used to record the upper and lower bounds that each data 
region's time
   // partition ID has ever reached.
-  private final Map<String, Pair<Long, Long>> 
dataRegionId2TimePartitionIdBound =
+  private final Map<Integer, Pair<Long, Long>> 
dataRegionId2TimePartitionIdBound =
       new ConcurrentHashMap<>();
 
   //////////////////////////// start & stop ////////////////////////////
 
   public synchronized void startListen(
-      String dataRegionId, PipeRealtimeDataRegionSource extractor) {
-    dataRegionId2Extractors
+      final int dataRegionId, final PipeRealtimeDataRegionSource source) {
+    dataRegionId2Sources
         .computeIfAbsent(dataRegionId, o -> new HashMap<>())
-        .put(extractor.getTaskID(), extractor);
-    // Assign the previously recorded upper and lower bounds of time partition 
to the extractor that
+        .put(source.getTaskID(), source);
+    // Assign the previously recorded upper and lower bounds of time partition 
to the source that
     // has just started listening to the growth of time partition.
-    Pair<Long, Long> timePartitionIdBound = 
dataRegionId2TimePartitionIdBound.get(dataRegionId);
+    final Pair<Long, Long> timePartitionIdBound =
+        dataRegionId2TimePartitionIdBound.get(dataRegionId);
     if (Objects.nonNull(timePartitionIdBound)) {
-      extractor.setDataRegionTimePartitionIdBound(timePartitionIdBound);
+      source.setDataRegionTimePartitionIdBound(timePartitionIdBound);
     }
   }
 
-  public synchronized void stopListen(String dataRegionId, 
PipeRealtimeDataRegionSource extractor) {
-    Map<String, PipeRealtimeDataRegionSource> extractors =
-        dataRegionId2Extractors.get(dataRegionId);
-    if (Objects.isNull(extractors)) {
+  public synchronized void stopListen(
+      final int dataRegionId, final PipeRealtimeDataRegionSource source) {
+    final Map<String, PipeRealtimeDataRegionSource> sources =
+        dataRegionId2Sources.get(dataRegionId);
+    if (Objects.isNull(sources)) {
       return;
     }
-    extractors.remove(extractor.getTaskID());
-    if (extractors.isEmpty()) {
-      dataRegionId2Extractors.remove(dataRegionId);
+    sources.remove(source.getTaskID());
+    if (sources.isEmpty()) {
+      dataRegionId2Sources.remove(dataRegionId);
     }
   }
 
   //////////////////////////// listen to changes ////////////////////////////
 
   public synchronized void listenToTimePartitionGrow(
-      String dataRegionId, Pair<Long, Long> newTimePartitionIdBound) {
+      final int dataRegionId, final Pair<Long, Long> newTimePartitionIdBound) {
     boolean shouldBroadcastTimePartitionChange = false;
-    Pair<Long, Long> oldTimePartitionIdBound = 
dataRegionId2TimePartitionIdBound.get(dataRegionId);
+    final Pair<Long, Long> oldTimePartitionIdBound =
+        dataRegionId2TimePartitionIdBound.get(dataRegionId);
 
     if (Objects.isNull(oldTimePartitionIdBound)) {
       dataRegionId2TimePartitionIdBound.put(dataRegionId, 
newTimePartitionIdBound);
@@ -86,14 +89,15 @@ public class PipeTimePartitionListener {
     }
 
     if (shouldBroadcastTimePartitionChange) {
-      Map<String, PipeRealtimeDataRegionSource> extractors =
-          dataRegionId2Extractors.get(dataRegionId);
-      if (Objects.isNull(extractors)) {
+      final Map<String, PipeRealtimeDataRegionSource> sources =
+          dataRegionId2Sources.get(dataRegionId);
+      if (Objects.isNull(sources)) {
         return;
       }
-      Pair<Long, Long> timePartitionIdBound = 
dataRegionId2TimePartitionIdBound.get(dataRegionId);
-      extractors.forEach(
-          (id, extractor) -> 
extractor.setDataRegionTimePartitionIdBound(timePartitionIdBound));
+      final Pair<Long, Long> timePartitionIdBound =
+          dataRegionId2TimePartitionIdBound.get(dataRegionId);
+      sources.forEach(
+          (id, source) -> 
source.setDataRegionTimePartitionIdBound(timePartitionIdBound));
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
index 2e1161977f8..5ded82a36ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
@@ -67,7 +67,7 @@ public class TimePartitionManager {
       // PipeInsertionDataNodeListener.listenToInsertNode.
       PipeTimePartitionListener.getInstance()
           .listenToTimePartitionGrow(
-              String.valueOf(timePartitionInfo.dataRegionId.getId()),
+              timePartitionInfo.dataRegionId.getId(),
               new Pair<>(
                   timePartitionInfoMapForRegion.firstKey(),
                   timePartitionInfoMapForRegion.lastKey()));

Reply via email to