This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 7721cab0392 Pipe: Do not listen to tsFiles when no sources need 
(#17669) (#17761)
7721cab0392 is described below

commit 7721cab039215b892c718d1a575e9f6ee71f8b46
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 27 15:47:21 2026 +0800

    Pipe: Do not listen to tsFiles when no sources need (#17669) (#17761)
    
    * assigner
    
    * Update PipeDataRegionAssigner.java
    
    * fix
    
    (cherry picked from commit 906b86f784bc60b515b3d51347b18478b103803c)
---
 .../realtime/assigner/PipeDataRegionAssigner.java  | 48 +++++++++++--
 .../listener/PipeInsertionDataNodeListener.java    | 47 ++++---------
 .../db/pipe/source/PipeRealtimeExtractTest.java    | 81 ++++++++++++++++++++++
 3 files changed, 139 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 0b4eb547144..2375726e427 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -56,6 +56,9 @@ public class PipeDataRegionAssigner implements Closeable {
 
   private final String dataRegionId;
 
+  private volatile int listenToTsFileSourceCount = 0;
+  private volatile int listenToInsertNodeSourceCount = 0;
+
   private final PipeEventCounter eventCounter = new 
PipeDataRegionEventCounter();
 
   public String getDataRegionId() {
@@ -194,12 +197,34 @@ public class PipeDataRegionAssigner implements Closeable {
             });
   }
 
-  public void startAssignTo(final PipeRealtimeDataRegionSource extractor) {
-    matcher.register(extractor);
+  public synchronized void startAssignTo(final PipeRealtimeDataRegionSource 
source) {
+    matcher.register(source);
+    if (source.isNeedListenToTsFile()) {
+      listenToTsFileSourceCount++;
+    }
+    if (source.isNeedListenToInsertNode()) {
+      listenToInsertNodeSourceCount++;
+    }
+    logSourceAssignmentChange("registered", source);
+  }
+
+  public synchronized void stopAssignTo(final PipeRealtimeDataRegionSource 
source) {
+    matcher.deregister(source);
+    if (source.isNeedListenToTsFile()) {
+      listenToTsFileSourceCount--;
+    }
+    if (source.isNeedListenToInsertNode()) {
+      listenToInsertNodeSourceCount--;
+    }
+    logSourceAssignmentChange("deregistered", source);
   }
 
-  public void stopAssignTo(final PipeRealtimeDataRegionSource extractor) {
-    matcher.deregister(extractor);
+  public boolean shouldListenToTsFile() {
+    return listenToTsFileSourceCount > 0;
+  }
+
+  public boolean shouldListenToInsertNode() {
+    return listenToInsertNodeSourceCount > 0;
   }
 
   public boolean notMoreSourceNeededToBeAssigned() {
@@ -236,4 +261,19 @@ public class PipeDataRegionAssigner implements Closeable {
   public int getPipeHeartbeatEventCount() {
     return eventCounter.getPipeHeartbeatEventCount();
   }
+
+  private void logSourceAssignmentChange(
+      final String action, final PipeRealtimeDataRegionSource source) {
+    LOGGER.info(
+        "Pipe {}@{} {} realtime source on data region {} (listenToTsFile={}, 
listenToInsertNode={}, registeredSourceCount={}, tsFileSourceCount={}, 
insertNodeSourceCount={}).",
+        source.getPipeName(),
+        source.getCreationTime(),
+        action,
+        dataRegionId,
+        source.isNeedListenToTsFile(),
+        source.isNeedListenToInsertNode(),
+        matcher.getRegisterCount(),
+        listenToTsFileSourceCount,
+        listenToInsertNodeSourceCount);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 882d4aff0d8..ad3586df830 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * PipeInsertionEventListener is a singleton in each data node.
@@ -48,23 +47,20 @@ public class PipeInsertionDataNodeListener {
   private final ConcurrentMap<String, PipeDataRegionAssigner> 
dataRegionId2Assigner =
       new ConcurrentHashMap<>();
 
-  private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0);
-  private final AtomicInteger listenToInsertNodeSourceCount = new 
AtomicInteger(0);
-
   //////////////////////////// start & stop ////////////////////////////
 
   public synchronized void startListenAndAssign(
       final String dataRegionId, final PipeRealtimeDataRegionSource source) {
-    dataRegionId2Assigner
-        .computeIfAbsent(dataRegionId, o -> new 
PipeDataRegionAssigner(dataRegionId))
-        .startAssignTo(source);
-
-    if (source.isNeedListenToTsFile()) {
-      listenToTsFileSourceCount.incrementAndGet();
-    }
-    if (source.isNeedListenToInsertNode()) {
-      listenToInsertNodeSourceCount.incrementAndGet();
-    }
+    // Keep registration inside compute so the assigner is fully started 
before it becomes visible
+    // to concurrent listeners.
+    dataRegionId2Assigner.compute(
+        dataRegionId,
+        (id, assigner) -> {
+          final PipeDataRegionAssigner actualAssigner =
+              assigner == null ? new PipeDataRegionAssigner(dataRegionId) : 
assigner;
+          actualAssigner.startAssignTo(source);
+          return actualAssigner;
+        });
   }
 
   public synchronized void stopListenAndAssign(
@@ -79,13 +75,6 @@ public class PipeInsertionDataNodeListener {
 
       assigner.stopAssignTo(source);
 
-      if (source.isNeedListenToTsFile()) {
-        listenToTsFileSourceCount.decrementAndGet();
-      }
-      if (source.isNeedListenToInsertNode()) {
-        listenToInsertNodeSourceCount.decrementAndGet();
-      }
-
       if (assigner.notMoreSourceNeededToBeAssigned()) {
         // The removed assigner will is the same as the one referenced by the 
variable `assigner`
         dataRegionId2Assigner.remove(dataRegionId);
@@ -104,14 +93,10 @@ public class PipeInsertionDataNodeListener {
 
   public void listenToTsFile(
       final String dataRegionId, final TsFileResource tsFileResource, final 
boolean isLoaded) {
-    // We don't judge whether listenToTsFileSourceCount.get() == 0 here on 
purpose
-    // because sources may use tsfile events when some exceptions occur in the
-    // insert nodes listening process.
-
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
 
-    // only events from registered data region will be extracted
-    if (assigner == null) {
+    // only events from registered data region with tsfile listeners will be 
extracted
+    if (assigner == null || !assigner.shouldListenToTsFile()) {
       return;
     }
 
@@ -121,14 +106,10 @@ public class PipeInsertionDataNodeListener {
 
   public void listenToInsertNode(
       final String dataRegionId, final InsertNode insertNode, final 
TsFileResource tsFileResource) {
-    if (listenToInsertNodeSourceCount.get() == 0) {
-      return;
-    }
-
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
 
-    // only events from registered data region will be extracted
-    if (assigner == null) {
+    // only events from registered data region with insert listeners will be 
extracted
+    if (assigner == null || !assigner.shouldListenToInsertNode()) {
       return;
     }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
index 9e5b42bb93c..2cba4c8eebc 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionHybridSource;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
@@ -39,6 +40,7 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.enums.TSDataType;
@@ -63,6 +65,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 public class PipeRealtimeExtractTest {
@@ -268,6 +271,52 @@ public class PipeRealtimeExtractTest {
     }
   }
 
+  @Test
+  public void testListenToTsFileSkipsAssignerWithoutTsFileSource() throws 
Exception {
+    try (final NoTsFileRealtimeDataRegionSource extractor =
+        new NoTsFileRealtimeDataRegionSource()) {
+      final PipeParameters parameters =
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, pattern1);
+                }
+              });
+      final PipeTaskRuntimeConfiguration configuration =
+          new PipeTaskRuntimeConfiguration(
+              new PipeTaskSourceRuntimeEnvironment(
+                  "1",
+                  1,
+                  Integer.parseInt(dataRegion1),
+                  new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+
+      extractor.validate(new PipeParameterValidator(parameters));
+      extractor.customize(parameters, configuration);
+      extractor.start();
+
+      final File dataRegionDir =
+          new File(tsFileDir.getPath() + File.separator + dataRegion1 + 
File.separator + "0");
+      final boolean ignored = dataRegionDir.mkdirs();
+      final File tsFile = new File(dataRegionDir, "0-0-0-0.tsfile");
+      Assert.assertTrue(tsFile.createNewFile());
+
+      final TsFileResource resource = new TsFileResource(tsFile);
+      resource.updateStartTime(
+          new PlainDeviceID(String.join(TsFileConstant.PATH_SEPARATOR, 
device)), 0);
+      resource.close();
+
+      PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegion1, 
resource, false);
+
+      final long deadline = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(1);
+      while (System.currentTimeMillis() < deadline
+          && extractor.getObservedTsFileEventCount() == 0) {
+        TimeUnit.MILLISECONDS.sleep(10);
+      }
+
+      Assert.assertEquals(0, extractor.getObservedTsFileEventCount());
+    }
+  }
+
   private Future<?> write2DataRegion(
       final int writeNum, final String dataRegionId, final int startNum) {
     final File dataRegionDir =
@@ -351,4 +400,36 @@ public class PipeRealtimeExtractTest {
           }
         });
   }
+
+  private static class NoTsFileRealtimeDataRegionSource extends 
PipeRealtimeDataRegionSource {
+
+    private final AtomicInteger observedTsFileEventCount = new 
AtomicInteger(0);
+
+    @Override
+    public Event supply() {
+      return null;
+    }
+
+    @Override
+    protected void doExtract(final PipeRealtimeEvent event) {
+      if (event.getEvent() instanceof TsFileInsertionEvent) {
+        observedTsFileEventCount.incrementAndGet();
+      }
+      
event.decreaseReferenceCount(NoTsFileRealtimeDataRegionSource.class.getName(), 
false);
+    }
+
+    @Override
+    public boolean isNeedListenToTsFile() {
+      return false;
+    }
+
+    @Override
+    public boolean isNeedListenToInsertNode() {
+      return false;
+    }
+
+    private int getObservedTsFileEventCount() {
+      return observedTsFileEventCount.get();
+    }
+  }
 }

Reply via email to