This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 906b86f784b Pipe: Do not listen to tsFiles when no sources need
(#17669)
906b86f784b is described below
commit 906b86f784bc60b515b3d51347b18478b103803c
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 25 10:23:41 2026 +0800
Pipe: Do not listen to tsFiles when no sources need (#17669)
* assigner
* Update PipeDataRegionAssigner.java
* fix
---
.../realtime/assigner/PipeDataRegionAssigner.java | 44 +++++++++++-
.../listener/PipeInsertionDataNodeListener.java | 47 ++++---------
.../db/pipe/source/PipeRealtimeExtractTest.java | 81 ++++++++++++++++++++++
3 files changed, 137 insertions(+), 35 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 58a5daabf0e..3f84e138a48 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -67,6 +67,9 @@ public class PipeDataRegionAssigner implements Closeable {
private Boolean isTableModel;
+ private volatile int listenToTsFileSourceCount = 0;
+ private volatile int listenToInsertNodeSourceCount = 0;
+
private final PipeEventCounter eventCounter = new
PipeDataRegionEventCounter();
public int getDataRegionId() {
@@ -225,12 +228,34 @@ public class PipeDataRegionAssigner implements Closeable {
});
}
- public void startAssignTo(final PipeRealtimeDataRegionSource source) {
+ public synchronized void startAssignTo(final PipeRealtimeDataRegionSource
source) {
matcher.register(source);
+ if (source.isNeedListenToTsFile()) {
+ listenToTsFileSourceCount++;
+ }
+ if (source.isNeedListenToInsertNode()) {
+ listenToInsertNodeSourceCount++;
+ }
+ logSourceAssignmentChange("registered", source);
}
- public void stopAssignTo(final PipeRealtimeDataRegionSource source) {
+ public synchronized void stopAssignTo(final PipeRealtimeDataRegionSource
source) {
matcher.deregister(source);
+ if (source.isNeedListenToTsFile()) {
+ listenToTsFileSourceCount--;
+ }
+ if (source.isNeedListenToInsertNode()) {
+ listenToInsertNodeSourceCount--;
+ }
+ logSourceAssignmentChange("deregistered", source);
+ }
+
+ public boolean shouldListenToTsFile() {
+ return listenToTsFileSourceCount > 0;
+ }
+
+ public boolean shouldListenToInsertNode() {
+ return listenToInsertNodeSourceCount > 0;
}
public void invalidateCache() {
@@ -272,6 +297,21 @@ public class PipeDataRegionAssigner implements Closeable {
return eventCounter.getPipeHeartbeatEventCount();
}
+ private void logSourceAssignmentChange(
+ final String action, final PipeRealtimeDataRegionSource source) {
+ LOGGER.info(
+ "Pipe {}@{} {} realtime source on data region {} (listenToTsFile={},
listenToInsertNode={}, registeredSourceCount={}, tsFileSourceCount={},
insertNodeSourceCount={}).",
+ source.getPipeName(),
+ source.getCreationTime(),
+ action,
+ dataRegionId,
+ source.isNeedListenToTsFile(),
+ source.isNeedListenToInsertNode(),
+ matcher.getRegisterCount(),
+ listenToTsFileSourceCount,
+ listenToInsertNodeSourceCount);
+ }
+
public Boolean isTableModel() {
return isTableModel;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 157fb0078e3..76c001b2696 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -34,7 +34,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* PipeInsertionEventListener is a singleton in each data node.
@@ -51,23 +50,20 @@ public class PipeInsertionDataNodeListener {
private final ConcurrentMap<Integer, PipeDataRegionAssigner>
dataRegionId2Assigner =
new ConcurrentHashMap<>();
- private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0);
- private final AtomicInteger listenToInsertNodeSourceCount = new
AtomicInteger(0);
-
//////////////////////////// start & stop ////////////////////////////
public synchronized void startListenAndAssign(
final int dataRegionId, final PipeRealtimeDataRegionSource source) {
- dataRegionId2Assigner
- .computeIfAbsent(dataRegionId, o -> new
PipeDataRegionAssigner(dataRegionId))
- .startAssignTo(source);
-
- if (source.isNeedListenToTsFile()) {
- listenToTsFileSourceCount.incrementAndGet();
- }
- if (source.isNeedListenToInsertNode()) {
- listenToInsertNodeSourceCount.incrementAndGet();
- }
+ // Keep registration inside compute so the assigner is fully started
before it becomes visible
+ // to concurrent listeners.
+ dataRegionId2Assigner.compute(
+ dataRegionId,
+ (id, assigner) -> {
+ final PipeDataRegionAssigner actualAssigner =
+ assigner == null ? new PipeDataRegionAssigner(dataRegionId) :
assigner;
+ actualAssigner.startAssignTo(source);
+ return actualAssigner;
+ });
}
public synchronized void stopListenAndAssign(
@@ -82,13 +78,6 @@ public class PipeInsertionDataNodeListener {
assigner.stopAssignTo(source);
- if (source.isNeedListenToTsFile()) {
- listenToTsFileSourceCount.decrementAndGet();
- }
- if (source.isNeedListenToInsertNode()) {
- listenToInsertNodeSourceCount.decrementAndGet();
- }
-
if (assigner.notMoreSourceNeededToBeAssigned()) {
// The removed assigner will is the same as the one referenced by the
variable `assigner`
dataRegionId2Assigner.remove(dataRegionId);
@@ -110,14 +99,10 @@ public class PipeInsertionDataNodeListener {
final String databaseName,
final TsFileResource tsFileResource,
final boolean isLoaded) {
- // We don't judge whether listenToTsFileSourceCount.get() == 0 here on
purpose
- // because spirces may use tsfile events when some exceptions occur in the
- // insert nodes listening process.
-
final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
- // only events from registered data region will be extracted
- if (assigner == null) {
+ // only events from registered data region with tsfile listeners will be
extracted
+ if (assigner == null || !assigner.shouldListenToTsFile()) {
return;
}
@@ -131,14 +116,10 @@ public class PipeInsertionDataNodeListener {
final String databaseName,
final InsertNode insertNode,
final TsFileResource tsFileResource) {
- if (listenToInsertNodeSourceCount.get() == 0) {
- return;
- }
-
final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
- // only events from registered data region will be extracted
- if (assigner == null) {
+ // only events from registered data region with insert listeners will be
extracted
+ if (assigner == null || !assigner.shouldListenToInsertNode()) {
return;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
index d3adbf9aaad..b793e8ca699 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvi
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionHybridSource;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
@@ -40,6 +41,7 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
@@ -64,6 +66,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
public class PipeRealtimeExtractTest {
@@ -261,6 +264,52 @@ public class PipeRealtimeExtractTest {
}
}
+ @Test
+ public void testListenToTsFileSkipsAssignerWithoutTsFileSource() throws
Exception {
+ try (final NoTsFileRealtimeDataRegionSource extractor =
+ new NoTsFileRealtimeDataRegionSource()) {
+ final PipeParameters parameters =
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, pattern1);
+ }
+ });
+ final PipeTaskRuntimeConfiguration configuration =
+ new PipeTaskRuntimeConfiguration(
+ new PipeTaskSourceRuntimeEnvironment(
+ "1", 1, dataRegion1, new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+
+ extractor.validate(new PipeParameterValidator(parameters));
+ extractor.customize(parameters, configuration);
+ extractor.start();
+
+ final File dataRegionDir =
+ new File(tsFileDir.getPath() + File.separator + dataRegion1 +
File.separator + "0");
+ final boolean ignored = dataRegionDir.mkdirs();
+ final File tsFile = new File(dataRegionDir, "0-0-0-0.tsfile");
+ Assert.assertTrue(tsFile.createNewFile());
+
+ final TsFileResource resource = new TsFileResource(tsFile);
+ resource.updateStartTime(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(
+ String.join(TsFileConstant.PATH_SEPARATOR, device)),
+ 0);
+ resource.close();
+
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToTsFile(dataRegion1, Integer.toString(dataRegion1),
resource, false);
+
+ final long deadline = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(1);
+ while (System.currentTimeMillis() < deadline
+ && extractor.getObservedTsFileEventCount() == 0) {
+ TimeUnit.MILLISECONDS.sleep(10);
+ }
+
+ Assert.assertEquals(0, extractor.getObservedTsFileEventCount());
+ }
+ }
+
private Future<?> write2DataRegion(
final int writeNum, final int dataRegionId, final int startNum) {
final File dataRegionDir =
@@ -348,4 +397,36 @@ public class PipeRealtimeExtractTest {
}
});
}
+
+ private static class NoTsFileRealtimeDataRegionSource extends
PipeRealtimeDataRegionSource {
+
+ private final AtomicInteger observedTsFileEventCount = new
AtomicInteger(0);
+
+ @Override
+ public Event supply() {
+ return null;
+ }
+
+ @Override
+ protected void doExtract(final PipeRealtimeEvent event) {
+ if (event.getEvent() instanceof TsFileInsertionEvent) {
+ observedTsFileEventCount.incrementAndGet();
+ }
+
event.decreaseReferenceCount(NoTsFileRealtimeDataRegionSource.class.getName(),
false);
+ }
+
+ @Override
+ public boolean isNeedListenToTsFile() {
+ return false;
+ }
+
+ @Override
+ public boolean isNeedListenToInsertNode() {
+ return false;
+ }
+
+ private int getObservedTsFileEventCount() {
+ return observedTsFileEventCount.get();
+ }
+ }
}