This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch pipe-flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-flush by this push:
new 590831ce426 partial
590831ce426 is described below
commit 590831ce42681fe177f3eff3dfc86f63f8502fc1
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 13 14:41:50 2026 +0800
partial
---
.../PipeRealtimeDataRegionHybridSource.java | 2 ++
.../PipeTsFileEpochProgressIndexKeeper.java | 26 ++++++++++++++++++----
.../apache/iotdb/commons/conf/CommonConfig.java | 15 +++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 4 ++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 5 +++++
5 files changed, 48 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 0721683f4d2..64fd0846465 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -176,6 +176,8 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
case EMPTY:
case USING_TSFILE:
case USING_BOTH:
+ PipeTsFileEpochProgressIndexKeeper.getInstance()
+ .markAsExtracted(dataRegionId, pipeName,
event.getTsFileEpoch().getFilePath());
if (!pendingQueue.waitedOffer(event)) {
// This would not happen, but just in case.
// pendingQueue is unbounded, so it should never reach capacity.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
index ff7d90c377d..fbb6502ab17 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
@@ -22,6 +22,8 @@ package
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.tsfile.utils.Pair;
+
import javax.annotation.Nonnull;
import java.util.Map;
@@ -32,15 +34,31 @@ import java.util.concurrent.ConcurrentHashMap;
public class PipeTsFileEpochProgressIndexKeeper {
// data region id -> pipeName -> tsFile path -> max progress index
- private final Map<String, Map<String, Map<String, TsFileResource>>>
progressIndexKeeper =
- new ConcurrentHashMap<>();
+ private final Map<String, Map<String, Map<String, Pair<TsFileResource,
Long>>>>
+ progressIndexKeeper = new ConcurrentHashMap<>();
public synchronized void registerProgressIndex(
final String dataRegionId, final String pipeName, final TsFileResource
resource) {
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
.computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
- .putIfAbsent(resource.getTsFilePath(), resource);
+ .putIfAbsent(resource.getTsFilePath(), new Pair<>(resource,
System.currentTimeMillis()));
+ }
+
+ public synchronized void markAsExtracted(
+ final String dataRegionId, final String pipeName, final String filePath)
{
+ final Pair<TsFileResource, Long> pair =
+ progressIndexKeeper
+ .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
+ .get(filePath);
+ if (Objects.nonNull(pair)) {
+ pair.setRight(Long.MAX_VALUE);
+ }
+ }
+
+ public void flushAllTsFiles(final String dataRegionId) {
+
}
public synchronized void eliminateProgressIndex(
@@ -64,7 +82,7 @@ public class PipeTsFileEpochProgressIndexKeeper {
.filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
.map(Entry::getValue)
.filter(Objects::nonNull)
- .anyMatch(resource ->
!resource.getMaxProgressIndex().isAfter(progressIndex));
+ .anyMatch(resource ->
!resource.getLeft().getMaxProgressIndex().isAfter(progressIndex));
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index cf68da89553..2e600029241 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -322,6 +322,9 @@ public class CommonConfig {
private volatile int pipeTsFilePinMaxLogNumPerRound = 10;
private volatile int pipeTsFilePinMaxLogIntervalRounds = 90;
+ // <= 0 means disabled
+ private volatile long pipeTsFileFlushIntervalSeconds = 5 * 60L;
+
private volatile boolean pipeMemoryManagementEnabled = true;
private volatile long pipeMemoryAllocateRetryIntervalMs = 50;
private volatile int pipeMemoryAllocateMaxRetries = 10;
@@ -1783,6 +1786,18 @@ public class CommonConfig {
"pipeTsFilePinMaxLogIntervalRounds is set to {}",
pipeTsFilePinMaxLogIntervalRounds);
}
+ public long getPipeTsFileFlushIntervalSeconds() {
+ return pipeTsFileFlushIntervalSeconds;
+ }
+
+ public void setPipeTsFileFlushIntervalSeconds(long
pipeTsFileFlushIntervalSeconds) {
+ if (this.pipeTsFileFlushIntervalSeconds == pipeTsFileFlushIntervalSeconds)
{
+ return;
+ }
+ this.pipeTsFileFlushIntervalSeconds = pipeTsFileFlushIntervalSeconds;
+ logger.info("pipeTsFileFlushIntervalSeconds is set to {}",
pipeTsFileFlushIntervalSeconds);
+ }
+
public boolean getPipeMemoryManagementEnabled() {
return pipeMemoryManagementEnabled;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index a49caa53368..16539d82d8a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -387,6 +387,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds();
}
+ public long getPipeTsFileFlushIntervalSeconds() {
+ return COMMON_CONFIG.getPipeTsFileFlushIntervalSeconds();
+ }
+
public long getPipeLoggerCacheMaxSizeInBytes() {
return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 832517b9745..6e72b4919fb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -131,6 +131,11 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_tsfile_pin_max_log_interval_rounds",
String.valueOf(config.getPipeTsFilePinMaxLogIntervalRounds()))));
+ config.setPipeTsFileFlushIntervalSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_tsfile_flush_interval_seconds",
+ String.valueOf(config.getPipeTsFileFlushIntervalSeconds()))));
config.setPipeMemoryManagementEnabled(
Boolean.parseBoolean(
properties.getProperty(