This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch pipe-flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-flush by this push:
new 34d59fa9c89 cx
34d59fa9c89 is described below
commit 34d59fa9c89a95395f4607ec9104a6ccc83dbd1e
Author: Caideyipi <[email protected]>
AuthorDate: Sat Feb 14 09:44:12 2026 +0800
cx
---
.../treemodel/auto/basic/IoTDBPipeSourceIT.java | 2 +-
.../PipeRealtimeDataRegionTsFileSource.java | 3 +-
.../realtime/assigner/PipeDataRegionAssigner.java | 53 ++++++++++------------
.../listener/PipeInsertionDataNodeListener.java | 24 +++++-----
.../apache/iotdb/commons/conf/CommonConfig.java | 2 +-
5 files changed, 40 insertions(+), 44 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index 3351958d2a1..ca21821d74e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -653,7 +653,7 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select count(*) from root.db*.**",
- "count(root.db1.d1.at1),count(root.db2.d1.at1),",
+ "count(root.db1.d1.at1),count(root.db1.d2.at1),",
Collections.singleton("2,2,"),
60);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index be1f648d163..352ca3349ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -89,7 +89,8 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
@Override
public boolean isNeedListenToInsertNode() {
- return false;
+ // Mark the tsFile to flush it
+ return shouldExtractInsertion;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index e60e1ad898d..f3d1e258395 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -140,17 +140,15 @@ public class PipeDataRegionAssigner implements Closeable {
matchedAndUnmatched
.getLeft()
.forEach(
- extractor -> {
+ source -> {
if (disruptor.isClosed()) {
return;
}
- if (event.getEvent().isGeneratedByPipe() &&
!extractor.isForwardingPipeRequests()) {
+ if (event.getEvent().isGeneratedByPipe() &&
!source.isForwardingPipeRequests()) {
final ProgressReportEvent reportEvent =
new ProgressReportEvent(
- extractor.getPipeName(),
- extractor.getCreationTime(),
- extractor.getPipeTaskMeta());
+ source.getPipeName(), source.getCreationTime(),
source.getPipeTaskMeta());
reportEvent.bindProgressIndex(event.getProgressIndex());
if
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
LOGGER.warn(
@@ -158,33 +156,33 @@ public class PipeDataRegionAssigner implements Closeable {
reportEvent);
return;
}
-
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
+
source.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
return;
}
final PipeRealtimeEvent copiedEvent =
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- extractor.getPipeName(),
- extractor.getCreationTime(),
- extractor.getPipeTaskMeta(),
- extractor.getTreePattern(),
- extractor.getTablePattern(),
- String.valueOf(extractor.getUserId()),
- extractor.getUserName(),
- extractor.getCliHostname(),
- extractor.isSkipIfNoPrivileges(),
- extractor.getRealtimeDataExtractionStartTime(),
- extractor.getRealtimeDataExtractionEndTime());
+ source.getPipeName(),
+ source.getCreationTime(),
+ source.getPipeTaskMeta(),
+ source.getTreePattern(),
+ source.getTablePattern(),
+ String.valueOf(source.getUserId()),
+ source.getUserName(),
+ source.getCliHostname(),
+ source.isSkipIfNoPrivileges(),
+ source.getRealtimeDataExtractionStartTime(),
+ source.getRealtimeDataExtractionEndTime());
final EnrichedEvent innerEvent = copiedEvent.getEvent();
// if using IoTV2, assign a replicateIndex for this realtime
event
if (DataRegionConsensusImpl.getInstance() instanceof
PipeConsensus
&& PipeConsensusProcessor.isShouldReplicate(innerEvent)) {
innerEvent.setReplicateIndexForIoTV2(
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
- extractor.getPipeName()));
+ source.getPipeName()));
LOGGER.debug(
"[{}]Set {} for realtime event {}",
- extractor.getPipeName(),
+ source.getPipeName(),
innerEvent.getReplicateIndexForIoTV2(),
innerEvent);
}
@@ -192,15 +190,14 @@ public class PipeDataRegionAssigner implements Closeable {
if (innerEvent instanceof PipeTsFileInsertionEvent) {
final PipeTsFileInsertionEvent tsFileInsertionEvent =
(PipeTsFileInsertionEvent) innerEvent;
- tsFileInsertionEvent.disableMod4NonTransferPipes(
- extractor.isShouldTransferModFile());
+
tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile());
}
if (innerEvent instanceof PipeDeleteDataNodeEvent) {
final PipeDeleteDataNodeEvent deleteDataNodeEvent =
(PipeDeleteDataNodeEvent) innerEvent;
final DeletionResourceManager manager =
-
DeletionResourceManager.getInstance(extractor.getDataRegionId());
+
DeletionResourceManager.getInstance(source.getDataRegionId());
// increase deletion resource's reference and bind real
deleteEvent
if (Objects.nonNull(manager)
&& DeletionResource.isDeleteNodeGeneratedInLocalByIoTV2(
@@ -217,13 +214,13 @@ public class PipeDataRegionAssigner implements Closeable {
copiedEvent);
return;
}
- extractor.extract(copiedEvent);
+ source.extract(copiedEvent);
});
matchedAndUnmatched
.getRight()
.forEach(
- extractor -> {
+ source -> {
if (disruptor.isClosed()) {
return;
}
@@ -233,9 +230,7 @@ public class PipeDataRegionAssigner implements Closeable {
|| innerEvent instanceof TsFileInsertionEvent) {
final ProgressReportEvent reportEvent =
new ProgressReportEvent(
- extractor.getPipeName(),
- extractor.getCreationTime(),
- extractor.getPipeTaskMeta());
+ source.getPipeName(), source.getCreationTime(),
source.getPipeTaskMeta());
reportEvent.bindProgressIndex(event.getProgressIndex());
if
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
LOGGER.warn(
@@ -243,7 +238,7 @@ public class PipeDataRegionAssigner implements Closeable {
reportEvent);
return;
}
-
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
+
source.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
}
});
}
@@ -260,7 +255,7 @@ public class PipeDataRegionAssigner implements Closeable {
matcher.invalidateCache();
}
- public boolean notMoreExtractorNeededToBeAssigned() {
+ public boolean notMoreSourceNeededToBeAssigned() {
return matcher.getRegisterCount() == 0;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index d255d80166a..d63cadeb822 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*
* <p>All events extracted by this listener will be first published to
different
* PipeEventDataRegionAssigners (identified by data region id), and then
PipeEventDataRegionAssigner
- * will filter events and assign them to different
PipeRealtimeEventDataRegionExtractors.
+ * will filter events and assign them to different
PipeRealtimeEventDataRegionSources.
*/
public class PipeInsertionDataNodeListener {
private final ConcurrentMap<Integer, PipeDataRegionAssigner>
dataRegionId2Assigner =
@@ -57,36 +57,36 @@ public class PipeInsertionDataNodeListener {
//////////////////////////// start & stop ////////////////////////////
public synchronized void startListenAndAssign(
- final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
+ final int dataRegionId, final PipeRealtimeDataRegionSource source) {
dataRegionId2Assigner
.computeIfAbsent(dataRegionId, o -> new
PipeDataRegionAssigner(dataRegionId))
- .startAssignTo(extractor);
+ .startAssignTo(source);
- if (extractor.isNeedListenToTsFile()) {
+ if (source.isNeedListenToTsFile()) {
listenToTsFileSourceCount.incrementAndGet();
}
- if (extractor.isNeedListenToInsertNode()) {
+ if (source.isNeedListenToInsertNode()) {
listenToInsertNodeSourceCount.incrementAndGet();
}
}
public synchronized void stopListenAndAssign(
- final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
+ final int dataRegionId, final PipeRealtimeDataRegionSource source) {
final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
if (assigner == null) {
return;
}
- assigner.stopAssignTo(extractor);
+ assigner.stopAssignTo(source);
- if (extractor.isNeedListenToTsFile()) {
+ if (source.isNeedListenToTsFile()) {
listenToTsFileSourceCount.decrementAndGet();
}
- if (extractor.isNeedListenToInsertNode()) {
+ if (source.isNeedListenToInsertNode()) {
listenToInsertNodeSourceCount.decrementAndGet();
}
- if (assigner.notMoreExtractorNeededToBeAssigned()) {
+ if (assigner.notMoreSourceNeededToBeAssigned()) {
// The removed assigner will is the same as the one referenced by the
variable `assigner`
dataRegionId2Assigner.remove(dataRegionId);
// This will help to release the memory occupied by the assigner
@@ -101,8 +101,8 @@ public class PipeInsertionDataNodeListener {
final String databaseName,
final TsFileResource tsFileResource,
final boolean isLoaded) {
- // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on
purpose
- // because extractors may use tsfile events when some exceptions occur in
the
+ // We don't judge whether listenToTsFileSourceCount.get() == 0 here on
purpose
+ // because sources may use tsfile events when some exceptions occur in the
// insert nodes listening process.
final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 2e600029241..42edc8b8fe7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -323,7 +323,7 @@ public class CommonConfig {
private volatile int pipeTsFilePinMaxLogIntervalRounds = 90;
// <= 0 means disabled
- private volatile long pipeTsFileFlushIntervalSeconds = 5 * 60L;
+ private volatile long pipeTsFileFlushIntervalSeconds = 20L;
private volatile boolean pipeMemoryManagementEnabled = true;
private volatile long pipeMemoryAllocateRetryIntervalMs = 50;