This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 906b86f784b Pipe: Do not listen to tsFiles when no sources need 
(#17669)
906b86f784b is described below

commit 906b86f784bc60b515b3d51347b18478b103803c
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 25 10:23:41 2026 +0800

    Pipe: Do not listen to tsFiles when no sources need (#17669)
    
    * assigner
    
    * Update PipeDataRegionAssigner.java
    
    * fix
---
 .../realtime/assigner/PipeDataRegionAssigner.java  | 44 +++++++++++-
 .../listener/PipeInsertionDataNodeListener.java    | 47 ++++---------
 .../db/pipe/source/PipeRealtimeExtractTest.java    | 81 ++++++++++++++++++++++
 3 files changed, 137 insertions(+), 35 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 58a5daabf0e..3f84e138a48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -67,6 +67,9 @@ public class PipeDataRegionAssigner implements Closeable {
 
   private Boolean isTableModel;
 
+  private volatile int listenToTsFileSourceCount = 0;
+  private volatile int listenToInsertNodeSourceCount = 0;
+
   private final PipeEventCounter eventCounter = new 
PipeDataRegionEventCounter();
 
   public int getDataRegionId() {
@@ -225,12 +228,34 @@ public class PipeDataRegionAssigner implements Closeable {
             });
   }
 
-  public void startAssignTo(final PipeRealtimeDataRegionSource source) {
+  public synchronized void startAssignTo(final PipeRealtimeDataRegionSource 
source) {
     matcher.register(source);
+    if (source.isNeedListenToTsFile()) {
+      listenToTsFileSourceCount++;
+    }
+    if (source.isNeedListenToInsertNode()) {
+      listenToInsertNodeSourceCount++;
+    }
+    logSourceAssignmentChange("registered", source);
   }
 
-  public void stopAssignTo(final PipeRealtimeDataRegionSource source) {
+  public synchronized void stopAssignTo(final PipeRealtimeDataRegionSource 
source) {
     matcher.deregister(source);
+    if (source.isNeedListenToTsFile()) {
+      listenToTsFileSourceCount--;
+    }
+    if (source.isNeedListenToInsertNode()) {
+      listenToInsertNodeSourceCount--;
+    }
+    logSourceAssignmentChange("deregistered", source);
+  }
+
+  public boolean shouldListenToTsFile() {
+    return listenToTsFileSourceCount > 0;
+  }
+
+  public boolean shouldListenToInsertNode() {
+    return listenToInsertNodeSourceCount > 0;
   }
 
   public void invalidateCache() {
@@ -272,6 +297,21 @@ public class PipeDataRegionAssigner implements Closeable {
     return eventCounter.getPipeHeartbeatEventCount();
   }
 
+  private void logSourceAssignmentChange(
+      final String action, final PipeRealtimeDataRegionSource source) {
+    LOGGER.info(
+        "Pipe {}@{} {} realtime source on data region {} (listenToTsFile={}, 
listenToInsertNode={}, registeredSourceCount={}, tsFileSourceCount={}, 
insertNodeSourceCount={}).",
+        source.getPipeName(),
+        source.getCreationTime(),
+        action,
+        dataRegionId,
+        source.isNeedListenToTsFile(),
+        source.isNeedListenToInsertNode(),
+        matcher.getRegisterCount(),
+        listenToTsFileSourceCount,
+        listenToInsertNodeSourceCount);
+  }
+
   public Boolean isTableModel() {
     return isTableModel;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 157fb0078e3..76c001b2696 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -34,7 +34,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * PipeInsertionEventListener is a singleton in each data node.
@@ -51,23 +50,20 @@ public class PipeInsertionDataNodeListener {
   private final ConcurrentMap<Integer, PipeDataRegionAssigner> 
dataRegionId2Assigner =
       new ConcurrentHashMap<>();
 
-  private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0);
-  private final AtomicInteger listenToInsertNodeSourceCount = new 
AtomicInteger(0);
-
   //////////////////////////// start & stop ////////////////////////////
 
   public synchronized void startListenAndAssign(
       final int dataRegionId, final PipeRealtimeDataRegionSource source) {
-    dataRegionId2Assigner
-        .computeIfAbsent(dataRegionId, o -> new 
PipeDataRegionAssigner(dataRegionId))
-        .startAssignTo(source);
-
-    if (source.isNeedListenToTsFile()) {
-      listenToTsFileSourceCount.incrementAndGet();
-    }
-    if (source.isNeedListenToInsertNode()) {
-      listenToInsertNodeSourceCount.incrementAndGet();
-    }
+    // Keep registration inside compute so the assigner is fully started 
before it becomes visible
+    // to concurrent listeners.
+    dataRegionId2Assigner.compute(
+        dataRegionId,
+        (id, assigner) -> {
+          final PipeDataRegionAssigner actualAssigner =
+              assigner == null ? new PipeDataRegionAssigner(dataRegionId) : 
assigner;
+          actualAssigner.startAssignTo(source);
+          return actualAssigner;
+        });
   }
 
   public synchronized void stopListenAndAssign(
@@ -82,13 +78,6 @@ public class PipeInsertionDataNodeListener {
 
       assigner.stopAssignTo(source);
 
-      if (source.isNeedListenToTsFile()) {
-        listenToTsFileSourceCount.decrementAndGet();
-      }
-      if (source.isNeedListenToInsertNode()) {
-        listenToInsertNodeSourceCount.decrementAndGet();
-      }
-
       if (assigner.notMoreSourceNeededToBeAssigned()) {
         // The removed assigner will is the same as the one referenced by the 
variable `assigner`
         dataRegionId2Assigner.remove(dataRegionId);
@@ -110,14 +99,10 @@ public class PipeInsertionDataNodeListener {
       final String databaseName,
       final TsFileResource tsFileResource,
       final boolean isLoaded) {
-    // We don't judge whether listenToTsFileSourceCount.get() == 0 here on 
purpose
-    // because spirces may use tsfile events when some exceptions occur in the
-    // insert nodes listening process.
-
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
 
-    // only events from registered data region will be extracted
-    if (assigner == null) {
+    // only events from registered data region with tsfile listeners will be 
extracted
+    if (assigner == null || !assigner.shouldListenToTsFile()) {
       return;
     }
 
@@ -131,14 +116,10 @@ public class PipeInsertionDataNodeListener {
       final String databaseName,
       final InsertNode insertNode,
       final TsFileResource tsFileResource) {
-    if (listenToInsertNodeSourceCount.get() == 0) {
-      return;
-    }
-
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
 
-    // only events from registered data region will be extracted
-    if (assigner == null) {
+    // only events from registered data region with insert listeners will be 
extracted
+    if (assigner == null || !assigner.shouldListenToInsertNode()) {
       return;
     }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
index d3adbf9aaad..b793e8ca699 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvi
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionHybridSource;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
@@ -40,6 +41,7 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.enums.TSDataType;
@@ -64,6 +66,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 public class PipeRealtimeExtractTest {
@@ -261,6 +264,52 @@ public class PipeRealtimeExtractTest {
     }
   }
 
+  @Test
+  public void testListenToTsFileSkipsAssignerWithoutTsFileSource() throws 
Exception {
+    try (final NoTsFileRealtimeDataRegionSource extractor =
+        new NoTsFileRealtimeDataRegionSource()) {
+      final PipeParameters parameters =
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, pattern1);
+                }
+              });
+      final PipeTaskRuntimeConfiguration configuration =
+          new PipeTaskRuntimeConfiguration(
+              new PipeTaskSourceRuntimeEnvironment(
+                  "1", 1, dataRegion1, new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+
+      extractor.validate(new PipeParameterValidator(parameters));
+      extractor.customize(parameters, configuration);
+      extractor.start();
+
+      final File dataRegionDir =
+          new File(tsFileDir.getPath() + File.separator + dataRegion1 + 
File.separator + "0");
+      final boolean ignored = dataRegionDir.mkdirs();
+      final File tsFile = new File(dataRegionDir, "0-0-0-0.tsfile");
+      Assert.assertTrue(tsFile.createNewFile());
+
+      final TsFileResource resource = new TsFileResource(tsFile);
+      resource.updateStartTime(
+          IDeviceID.Factory.DEFAULT_FACTORY.create(
+              String.join(TsFileConstant.PATH_SEPARATOR, device)),
+          0);
+      resource.close();
+
+      PipeInsertionDataNodeListener.getInstance()
+          .listenToTsFile(dataRegion1, Integer.toString(dataRegion1), 
resource, false);
+
+      final long deadline = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(1);
+      while (System.currentTimeMillis() < deadline
+          && extractor.getObservedTsFileEventCount() == 0) {
+        TimeUnit.MILLISECONDS.sleep(10);
+      }
+
+      Assert.assertEquals(0, extractor.getObservedTsFileEventCount());
+    }
+  }
+
   private Future<?> write2DataRegion(
       final int writeNum, final int dataRegionId, final int startNum) {
     final File dataRegionDir =
@@ -348,4 +397,36 @@ public class PipeRealtimeExtractTest {
           }
         });
   }
+
+  private static class NoTsFileRealtimeDataRegionSource extends 
PipeRealtimeDataRegionSource {
+
+    private final AtomicInteger observedTsFileEventCount = new 
AtomicInteger(0);
+
+    @Override
+    public Event supply() {
+      return null;
+    }
+
+    @Override
+    protected void doExtract(final PipeRealtimeEvent event) {
+      if (event.getEvent() instanceof TsFileInsertionEvent) {
+        observedTsFileEventCount.incrementAndGet();
+      }
+      
event.decreaseReferenceCount(NoTsFileRealtimeDataRegionSource.class.getName(), 
false);
+    }
+
+    @Override
+    public boolean isNeedListenToTsFile() {
+      return false;
+    }
+
+    @Override
+    public boolean isNeedListenToInsertNode() {
+      return false;
+    }
+
+    private int getObservedTsFileEventCount() {
+      return observedTsFileEventCount.get();
+    }
+  }
 }

Reply via email to