This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e227a53a726 Pipe/IoTV2: Persist progress index locally before shutdown
to accurate recovery after restart (#15779)
e227a53a726 is described below
commit e227a53a726e2401de7924df15face54f6a31c95
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jun 19 12:05:09 2025 +0800
Pipe/IoTV2: Persist progress index locally before shutdown to accurate
recovery after restart (#15779)
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 24 +++++++++++++++++++++
.../iotdb/db/service/DataNodeShutdownHook.java | 2 ++
.../pipe/agent/task/meta/PipeRuntimeMeta.java | 9 ++++++++
.../commons/pipe/agent/task/meta/PipeTaskMeta.java | 25 ++++++++++------------
4 files changed, 46 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 8fd099baa06..9d00d5a1206 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -840,6 +840,30 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
return isSnapshotMode;
}
+ ///////////////////////// Shutdown Logic /////////////////////////
+
+ public void persistAllProgressIndexLocally() {
+ if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
+ LOGGER.info(
+ "Pipe progress index persist disabled. Skipping persist all progress
index locally.");
+ return;
+ }
+ if (!tryReadLockWithTimeOut(10)) {
+ LOGGER.info("Failed to persist all progress index locally because of
timeout.");
+ return;
+ }
+ try {
+ for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+ pipeMeta.getRuntimeMeta().persistProgressIndex();
+ }
+ LOGGER.info("Persist all progress index locally successfully.");
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to record all progress index locally, because {}.",
e.getMessage(), e);
+ } finally {
+ releaseReadLock();
+ }
+ }
+
///////////////////////// Pipe Consensus /////////////////////////
public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final
int consensusGroupId) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 731c4b09da1..efa628bbcbf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -118,6 +118,8 @@ public class DataNodeShutdownHook extends Thread {
triggerSnapshotForAllDataRegion();
}
+ // Persist progress index before shutdown to accurate recovery after
restart
+ PipeDataNodeAgent.task().persistAllProgressIndexLocally();
// Shutdown all consensus pipe's receiver
PipeDataNodeAgent.receiver().pipeConsensus().closeReceiverExecutor();
// Shutdown pipe progressIndex background service
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
index 6f3ddd9e0ad..ff77564bd2f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
@@ -140,6 +140,15 @@ public class PipeRuntimeMeta {
this.isStoppedByRuntimeException.set(isStoppedByRuntimeException);
}
+ public void persistProgressIndex() {
+ // Iterate through all the task metas and persist their progress index
+ for (final PipeTaskMeta taskMeta : consensusGroupId2TaskMetaMap.values()) {
+ if (taskMeta.getProgressIndex() != null) {
+ taskMeta.persistProgressIndex();
+ }
+ }
+ }
+
/**
* We use negative regionId to identify the external pipe source, which is
not a consensus group
* id. Then we can reuse the regionId to schedule the external pipe source
and store the progress
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index 6a4ab25db7e..4a753c0e5bf 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -113,34 +113,31 @@ public class PipeTaskMeta {
public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
// only pipeTaskMeta that need to updateProgressIndex will persist
progress index
// isRegisterPersistTask is used to avoid multiple threads registering
persist task concurrently
- if (Objects.nonNull(progressIndexPersistFile)
- && !isRegisterPersistTask.getAndSet(true)
+ if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
&& this.persistProgressIndexFuture == null
- && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
+ && !isRegisterPersistTask.getAndSet(true)) {
this.persistProgressIndexFuture =
PipePeriodicalJobExecutor.submitBackgroundJob(
- () -> {
- if
(PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
- persistProgressIndex();
- }
- },
+ this::persistProgressIndex,
0,
PipeConfig.getInstance().getPipeProgressIndexFlushIntervalMs());
}
progressIndex.updateAndGet(
index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
- if (Objects.nonNull(progressIndexPersistFile)
- && updateCount.incrementAndGet() - lastPersistCount.get() >
checkPointGap
- && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
+
+ if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
+ && updateCount.incrementAndGet() - lastPersistCount.get() >
checkPointGap) {
persistProgressIndex();
}
+
return progressIndex.get();
}
- private synchronized void persistProgressIndex() {
- if (lastPersistCount.get() == updateCount.get()) {
- // in case of multiple threads calling updateProgressIndex at the same
time
+ public synchronized void persistProgressIndex() {
+ if (Objects.isNull(progressIndexPersistFile)
+ // in case of multiple threads calling updateProgressIndex at the same
time
+ || lastPersistCount.get() == updateCount.get()) {
return;
}