This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch pipe-flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-flush by this push:
new dea7feb59eb may-fix
dea7feb59eb is described below
commit dea7feb59eb7923629683f033f5d09e9409665fb
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 13 16:23:50 2026 +0800
may-fix
---
.../dataregion/realtime/PipeRealtimeDataRegionHybridSource.java | 2 +-
.../dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java | 6 +++++-
.../assigner/PipeTsFileEpochProgressIndexAndFlushManager.java | 2 +-
3 files changed, 7 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 5bf5e3fd4a5..bb26b5e18da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -84,7 +84,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
if (canNotUseTabletAnymore(event)) {
event.getTsFileEpoch().migrateState(this, curState ->
TsFileEpoch.State.USING_TSFILE);
PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
- .registerProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getResource());
+ .registerResource(dataRegionId, pipeName,
event.getTsFileEpoch().getResource());
} else {
event
.getTsFileEpoch()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index f6685ced073..7c9d5149817 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -40,6 +40,8 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
@Override
protected void doExtract(PipeRealtimeEvent event) {
+
PipeTsFileEpochProgressIndexAndFlushManager.getInstance().flushAllTimeoutTsFiles();
+
if (event.getEvent() instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
return;
@@ -52,7 +54,7 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TSFILE);
PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
- .registerProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getResource());
+ .registerResource(dataRegionId, pipeName,
event.getTsFileEpoch().getResource());
if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
false);
@@ -75,6 +77,8 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
false);
}
+ PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
+ .markAsExtracted(dataRegionId, pipeName,
event.getTsFileEpoch().getFilePath());
event.getTsFileEpoch().clearState(this);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
index 677fc92710a..20b2dfe410f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
@@ -40,7 +40,7 @@ public class PipeTsFileEpochProgressIndexAndFlushManager {
private final Map<Integer, Map<String, Map<String, Pair<TsFileResource,
Long>>>>
progressIndexKeeper = new ConcurrentHashMap<>();
- public synchronized void registerProgressIndex(
+ public synchronized void registerResource(
final int dataRegionId, final String pipeName, final TsFileResource
resource) {
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())