This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch record-progress-when-shutdown in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 766c8c1afa1b780ba9d3dfced8f17075fb506a0b Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Jun 18 19:02:00 2025 +0800 Pipe: Persist progress index before shutdown to accurate recovery after restart --- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 19 +++++++++++++++++++ .../apache/iotdb/db/service/DataNodeShutdownHook.java | 2 ++ .../commons/pipe/agent/task/meta/PipeRuntimeMeta.java | 9 +++++++++ .../commons/pipe/agent/task/meta/PipeTaskMeta.java | 2 +- 4 files changed, 31 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 8fd099baa06..15d99f2cff9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -840,6 +840,25 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { return isSnapshotMode; } + ///////////////////////// Shutdown Logic ///////////////////////// + + public void persistAllProgressIndexLocally() { + if (!tryReadLockWithTimeOut(10)) { + LOGGER.info("Failed to persist all progress index locally because of timeout."); + return; + } + try { + for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { + pipeMeta.getRuntimeMeta().persistProgressIndex(); + } + LOGGER.info("Persist all progress index locally successfully."); + } catch (final Exception e) { + LOGGER.warn("Failed to record all progress index locally, because {}.", e.getMessage(), e); + } finally { + releaseReadLock(); + } + } + ///////////////////////// Pipe Consensus ///////////////////////// public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final int consensusGroupId) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index 731c4b09da1..efa628bbcbf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -118,6 +118,8 @@ public class DataNodeShutdownHook extends Thread { triggerSnapshotForAllDataRegion(); } + // Persist progress index before shutdown to accurate recovery after restart + PipeDataNodeAgent.task().persistAllProgressIndexLocally(); // Shutdown all consensus pipe's receiver PipeDataNodeAgent.receiver().pipeConsensus().closeReceiverExecutor(); // Shutdown pipe progressIndex background service diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java index 6f3ddd9e0ad..ff77564bd2f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java @@ -140,6 +140,15 @@ public class PipeRuntimeMeta { this.isStoppedByRuntimeException.set(isStoppedByRuntimeException); } + public void persistProgressIndex() { + // Iterate through all the task metas and persist their progress index + for (final PipeTaskMeta taskMeta : consensusGroupId2TaskMetaMap.values()) { + if (taskMeta.getProgressIndex() != null) { + taskMeta.persistProgressIndex(); + } + } + } + /** * We use negative regionId to identify the external pipe source, which is not a consensus group * id. Then we can reuse the regionId to schedule the external pipe source and store the progress diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java index 6a4ab25db7e..2bd40510dc4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java @@ -138,7 +138,7 @@ public class PipeTaskMeta { return progressIndex.get(); } - private synchronized void persistProgressIndex() { + public synchronized void persistProgressIndex() { if (lastPersistCount.get() == updateCount.get()) { // in case of multiple threads calling updateProgressIndex at the same time return;
