This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e227a53a726 Pipe/IoTV2: Persist progress index locally before shutdown 
to accurate recovery after restart (#15779)
e227a53a726 is described below

commit e227a53a726e2401de7924df15face54f6a31c95
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jun 19 12:05:09 2025 +0800

    Pipe/IoTV2: Persist progress index locally before shutdown to accurate 
recovery after restart (#15779)
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 24 +++++++++++++++++++++
 .../iotdb/db/service/DataNodeShutdownHook.java     |  2 ++
 .../pipe/agent/task/meta/PipeRuntimeMeta.java      |  9 ++++++++
 .../commons/pipe/agent/task/meta/PipeTaskMeta.java | 25 ++++++++++------------
 4 files changed, 46 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 8fd099baa06..9d00d5a1206 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -840,6 +840,30 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     return isSnapshotMode;
   }
 
+  ///////////////////////// Shutdown Logic /////////////////////////
+
+  public void persistAllProgressIndexLocally() {
+    if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
+      LOGGER.info(
+          "Pipe progress index persist disabled. Skipping persist all progress 
index locally.");
+      return;
+    }
+    if (!tryReadLockWithTimeOut(10)) {
+      LOGGER.info("Failed to persist all progress index locally because of 
timeout.");
+      return;
+    }
+    try {
+      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+        pipeMeta.getRuntimeMeta().persistProgressIndex();
+      }
+      LOGGER.info("Persist all progress index locally successfully.");
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to record all progress index locally, because {}.", 
e.getMessage(), e);
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   ///////////////////////// Pipe Consensus /////////////////////////
 
   public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final 
int consensusGroupId) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 731c4b09da1..efa628bbcbf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -118,6 +118,8 @@ public class DataNodeShutdownHook extends Thread {
       triggerSnapshotForAllDataRegion();
     }
 
+    // Persist progress index before shutdown to accurate recovery after 
restart
+    PipeDataNodeAgent.task().persistAllProgressIndexLocally();
     // Shutdown all consensus pipe's receiver
     PipeDataNodeAgent.receiver().pipeConsensus().closeReceiverExecutor();
     // Shutdown pipe progressIndex background service
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
index 6f3ddd9e0ad..ff77564bd2f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
@@ -140,6 +140,15 @@ public class PipeRuntimeMeta {
     this.isStoppedByRuntimeException.set(isStoppedByRuntimeException);
   }
 
+  public void persistProgressIndex() {
+    // Iterate through all the task metas and persist their progress index
+    for (final PipeTaskMeta taskMeta : consensusGroupId2TaskMetaMap.values()) {
+      if (taskMeta.getProgressIndex() != null) {
+        taskMeta.persistProgressIndex();
+      }
+    }
+  }
+
   /**
    * We use negative regionId to identify the external pipe source, which is 
not a consensus group
    * id. Then we can reuse the regionId to schedule the external pipe source 
and store the progress
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index 6a4ab25db7e..4a753c0e5bf 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -113,34 +113,31 @@ public class PipeTaskMeta {
   public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
     // only pipeTaskMeta that need to updateProgressIndex will persist 
progress index
     // isRegisterPersistTask is used to avoid multiple threads registering 
persist task concurrently
-    if (Objects.nonNull(progressIndexPersistFile)
-        && !isRegisterPersistTask.getAndSet(true)
+    if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
         && this.persistProgressIndexFuture == null
-        && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
+        && !isRegisterPersistTask.getAndSet(true)) {
       this.persistProgressIndexFuture =
           PipePeriodicalJobExecutor.submitBackgroundJob(
-              () -> {
-                if 
(PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
-                  persistProgressIndex();
-                }
-              },
+              this::persistProgressIndex,
               0,
               PipeConfig.getInstance().getPipeProgressIndexFlushIntervalMs());
     }
 
     progressIndex.updateAndGet(
         index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
-    if (Objects.nonNull(progressIndexPersistFile)
-        && updateCount.incrementAndGet() - lastPersistCount.get() > 
checkPointGap
-        && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
+
+    if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
+        && updateCount.incrementAndGet() - lastPersistCount.get() > 
checkPointGap) {
       persistProgressIndex();
     }
+
     return progressIndex.get();
   }
 
-  private synchronized void persistProgressIndex() {
-    if (lastPersistCount.get() == updateCount.get()) {
-      // in case of multiple threads calling updateProgressIndex at the same 
time
+  public synchronized void persistProgressIndex() {
+    if (Objects.isNull(progressIndexPersistFile)
+        // in case of multiple threads calling updateProgressIndex at the same 
time
+        || lastPersistCount.get() == updateCount.get()) {
       return;
     }
 

Reply via email to