This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-flush by this push:
     new 34d59fa9c89 cx
34d59fa9c89 is described below

commit 34d59fa9c89a95395f4607ec9104a6ccc83dbd1e
Author: Caideyipi <[email protected]>
AuthorDate: Sat Feb 14 09:44:12 2026 +0800

    cx
---
 .../treemodel/auto/basic/IoTDBPipeSourceIT.java    |  2 +-
 .../PipeRealtimeDataRegionTsFileSource.java        |  3 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  | 53 ++++++++++------------
 .../listener/PipeInsertionDataNodeListener.java    | 24 +++++-----
 .../apache/iotdb/commons/conf/CommonConfig.java    |  2 +-
 5 files changed, 40 insertions(+), 44 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index 3351958d2a1..ca21821d74e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -653,7 +653,7 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
       TestUtils.assertDataEventuallyOnEnv(
           receiverEnv,
           "select count(*) from root.db*.**",
-          "count(root.db1.d1.at1),count(root.db2.d1.at1),",
+          "count(root.db1.d1.at1),count(root.db1.d2.at1),",
           Collections.singleton("2,2,"),
           60);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index be1f648d163..352ca3349ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -89,7 +89,8 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
 
   @Override
   public boolean isNeedListenToInsertNode() {
-    return false;
+    // Mark the tsFile to flush it
+    return shouldExtractInsertion;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index e60e1ad898d..f3d1e258395 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -140,17 +140,15 @@ public class PipeDataRegionAssigner implements Closeable {
     matchedAndUnmatched
         .getLeft()
         .forEach(
-            extractor -> {
+            source -> {
               if (disruptor.isClosed()) {
                 return;
               }
 
-              if (event.getEvent().isGeneratedByPipe() && 
!extractor.isForwardingPipeRequests()) {
+              if (event.getEvent().isGeneratedByPipe() && 
!source.isForwardingPipeRequests()) {
                 final ProgressReportEvent reportEvent =
                     new ProgressReportEvent(
-                        extractor.getPipeName(),
-                        extractor.getCreationTime(),
-                        extractor.getPipeTaskMeta());
+                        source.getPipeName(), source.getCreationTime(), 
source.getPipeTaskMeta());
                 reportEvent.bindProgressIndex(event.getProgressIndex());
                 if 
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
                   LOGGER.warn(
@@ -158,33 +156,33 @@ public class PipeDataRegionAssigner implements Closeable {
                       reportEvent);
                   return;
                 }
-                
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
+                
source.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
                 return;
               }
 
               final PipeRealtimeEvent copiedEvent =
                   event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-                      extractor.getPipeName(),
-                      extractor.getCreationTime(),
-                      extractor.getPipeTaskMeta(),
-                      extractor.getTreePattern(),
-                      extractor.getTablePattern(),
-                      String.valueOf(extractor.getUserId()),
-                      extractor.getUserName(),
-                      extractor.getCliHostname(),
-                      extractor.isSkipIfNoPrivileges(),
-                      extractor.getRealtimeDataExtractionStartTime(),
-                      extractor.getRealtimeDataExtractionEndTime());
+                      source.getPipeName(),
+                      source.getCreationTime(),
+                      source.getPipeTaskMeta(),
+                      source.getTreePattern(),
+                      source.getTablePattern(),
+                      String.valueOf(source.getUserId()),
+                      source.getUserName(),
+                      source.getCliHostname(),
+                      source.isSkipIfNoPrivileges(),
+                      source.getRealtimeDataExtractionStartTime(),
+                      source.getRealtimeDataExtractionEndTime());
               final EnrichedEvent innerEvent = copiedEvent.getEvent();
               // if using IoTV2, assign a replicateIndex for this realtime 
event
               if (DataRegionConsensusImpl.getInstance() instanceof 
PipeConsensus
                   && PipeConsensusProcessor.isShouldReplicate(innerEvent)) {
                 innerEvent.setReplicateIndexForIoTV2(
                     
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
-                        extractor.getPipeName()));
+                        source.getPipeName()));
                 LOGGER.debug(
                     "[{}]Set {} for realtime event {}",
-                    extractor.getPipeName(),
+                    source.getPipeName(),
                     innerEvent.getReplicateIndexForIoTV2(),
                     innerEvent);
               }
@@ -192,15 +190,14 @@ public class PipeDataRegionAssigner implements Closeable {
               if (innerEvent instanceof PipeTsFileInsertionEvent) {
                 final PipeTsFileInsertionEvent tsFileInsertionEvent =
                     (PipeTsFileInsertionEvent) innerEvent;
-                tsFileInsertionEvent.disableMod4NonTransferPipes(
-                    extractor.isShouldTransferModFile());
+                
tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile());
               }
 
               if (innerEvent instanceof PipeDeleteDataNodeEvent) {
                 final PipeDeleteDataNodeEvent deleteDataNodeEvent =
                     (PipeDeleteDataNodeEvent) innerEvent;
                 final DeletionResourceManager manager =
-                    
DeletionResourceManager.getInstance(extractor.getDataRegionId());
+                    
DeletionResourceManager.getInstance(source.getDataRegionId());
                 // increase deletion resource's reference and bind real 
deleteEvent
                 if (Objects.nonNull(manager)
                     && DeletionResource.isDeleteNodeGeneratedInLocalByIoTV2(
@@ -217,13 +214,13 @@ public class PipeDataRegionAssigner implements Closeable {
                     copiedEvent);
                 return;
               }
-              extractor.extract(copiedEvent);
+              source.extract(copiedEvent);
             });
 
     matchedAndUnmatched
         .getRight()
         .forEach(
-            extractor -> {
+            source -> {
               if (disruptor.isClosed()) {
                 return;
               }
@@ -233,9 +230,7 @@ public class PipeDataRegionAssigner implements Closeable {
                   || innerEvent instanceof TsFileInsertionEvent) {
                 final ProgressReportEvent reportEvent =
                     new ProgressReportEvent(
-                        extractor.getPipeName(),
-                        extractor.getCreationTime(),
-                        extractor.getPipeTaskMeta());
+                        source.getPipeName(), source.getCreationTime(), 
source.getPipeTaskMeta());
                 reportEvent.bindProgressIndex(event.getProgressIndex());
                 if 
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
                   LOGGER.warn(
@@ -243,7 +238,7 @@ public class PipeDataRegionAssigner implements Closeable {
                       reportEvent);
                   return;
                 }
-                
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
+                
source.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
               }
             });
   }
@@ -260,7 +255,7 @@ public class PipeDataRegionAssigner implements Closeable {
     matcher.invalidateCache();
   }
 
-  public boolean notMoreExtractorNeededToBeAssigned() {
+  public boolean notMoreSourceNeededToBeAssigned() {
     return matcher.getRegisterCount() == 0;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index d255d80166a..d63cadeb822 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  *
  * <p>All events extracted by this listener will be first published to 
different
  * PipeEventDataRegionAssigners (identified by data region id), and then 
PipeEventDataRegionAssigner
- * will filter events and assign them to different 
PipeRealtimeEventDataRegionExtractors.
+ * will filter events and assign them to different 
PipeRealtimeEventDataRegionSources.
  */
 public class PipeInsertionDataNodeListener {
   private final ConcurrentMap<Integer, PipeDataRegionAssigner> 
dataRegionId2Assigner =
@@ -57,36 +57,36 @@ public class PipeInsertionDataNodeListener {
   //////////////////////////// start & stop ////////////////////////////
 
   public synchronized void startListenAndAssign(
-      final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
+      final int dataRegionId, final PipeRealtimeDataRegionSource source) {
     dataRegionId2Assigner
         .computeIfAbsent(dataRegionId, o -> new 
PipeDataRegionAssigner(dataRegionId))
-        .startAssignTo(extractor);
+        .startAssignTo(source);
 
-    if (extractor.isNeedListenToTsFile()) {
+    if (source.isNeedListenToTsFile()) {
       listenToTsFileSourceCount.incrementAndGet();
     }
-    if (extractor.isNeedListenToInsertNode()) {
+    if (source.isNeedListenToInsertNode()) {
       listenToInsertNodeSourceCount.incrementAndGet();
     }
   }
 
   public synchronized void stopListenAndAssign(
-      final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
+      final int dataRegionId, final PipeRealtimeDataRegionSource source) {
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
     if (assigner == null) {
       return;
     }
 
-    assigner.stopAssignTo(extractor);
+    assigner.stopAssignTo(source);
 
-    if (extractor.isNeedListenToTsFile()) {
+    if (source.isNeedListenToTsFile()) {
       listenToTsFileSourceCount.decrementAndGet();
     }
-    if (extractor.isNeedListenToInsertNode()) {
+    if (source.isNeedListenToInsertNode()) {
       listenToInsertNodeSourceCount.decrementAndGet();
     }
 
-    if (assigner.notMoreExtractorNeededToBeAssigned()) {
+    if (assigner.notMoreSourceNeededToBeAssigned()) {
       // The removed assigner will is the same as the one referenced by the 
variable `assigner`
       dataRegionId2Assigner.remove(dataRegionId);
       // This will help to release the memory occupied by the assigner
@@ -101,8 +101,8 @@ public class PipeInsertionDataNodeListener {
       final String databaseName,
       final TsFileResource tsFileResource,
       final boolean isLoaded) {
-    // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on 
purpose
-    // because extractors may use tsfile events when some exceptions occur in 
the
+    // We don't judge whether listenToTsFileSourceCount.get() == 0 here on 
purpose
+    // because sources may use tsfile events when some exceptions occur in the
     // insert nodes listening process.
 
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 2e600029241..42edc8b8fe7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -323,7 +323,7 @@ public class CommonConfig {
   private volatile int pipeTsFilePinMaxLogIntervalRounds = 90;
 
   // <= 0 means disabled
-  private volatile long pipeTsFileFlushIntervalSeconds = 5 * 60L;
+  private volatile long pipeTsFileFlushIntervalSeconds = 20L;
 
   private volatile boolean pipeMemoryManagementEnabled = true;
   private volatile long pipeMemoryAllocateRetryIntervalMs = 50;

Reply via email to