This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch cp-15779
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f316179c08b81334b6e19c34df40f6f19bc16b30
Author: Steve Yurong Su <r...@apache.org>
AuthorDate: Thu Jun 19 12:05:09 2025 +0800

    Pipe/IoTV2: Persist progress index locally before shutdown to accurate 
recovery after restart (#15779)
    
    (cherry picked from commit e227a53a726e2401de7924df15face54f6a31c95)
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 24 +++++++++++++++++++++
 .../iotdb/db/service/DataNodeShutdownHook.java     |  3 +++
 .../pipe/agent/task/meta/PipeRuntimeMeta.java      |  9 ++++++++
 .../commons/pipe/agent/task/meta/PipeTaskMeta.java | 25 ++++++++++------------
 4 files changed, 47 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index ef0d4c6b4d9..cd7cd4ea8fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -817,6 +817,30 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
   }
 
+  ///////////////////////// Shutdown Logic /////////////////////////
+
+  public void persistAllProgressIndexLocally() {
+    if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
+      LOGGER.info(
+          "Pipe progress index persist disabled. Skipping persist all progress 
index locally.");
+      return;
+    }
+    if (!tryReadLockWithTimeOut(10)) {
+      LOGGER.info("Failed to persist all progress index locally because of 
timeout.");
+      return;
+    }
+    try {
+      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+        pipeMeta.getRuntimeMeta().persistProgressIndex();
+      }
+      LOGGER.info("Persist all progress index locally successfully.");
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to record all progress index locally, because {}.", 
e.getMessage(), e);
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   ///////////////////////// Pipe Consensus /////////////////////////
 
   public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final 
int consensusGroupId) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 13b87f5bb83..6981c862a34 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -86,6 +87,8 @@ public class DataNodeShutdownHook extends Thread {
       triggerSnapshotForAllDataRegion();
     }
 
+    // Persist progress index before shutdown to accurate recovery after 
restart
+    PipeDataNodeAgent.task().persistAllProgressIndexLocally();
     // Shutdown pipe progressIndex background service
     PipePeriodicalJobExecutor.shutdownBackgroundService();
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
index 752edae0cf0..402a601e52b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
@@ -140,6 +140,15 @@ public class PipeRuntimeMeta {
     this.isStoppedByRuntimeException.set(isStoppedByRuntimeException);
   }
 
+  public void persistProgressIndex() {
+    // Iterate through all the task metas and persist their progress index
+    for (final PipeTaskMeta taskMeta : consensusGroupId2TaskMetaMap.values()) {
+      if (taskMeta.getProgressIndex() != null) {
+        taskMeta.persistProgressIndex();
+      }
+    }
+  }
+
   public ByteBuffer serialize() throws IOException {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index 6a4ab25db7e..4a753c0e5bf 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -113,34 +113,31 @@ public class PipeTaskMeta {
   public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
     // only pipeTaskMeta that need to updateProgressIndex will persist 
progress index
     // isRegisterPersistTask is used to avoid multiple threads registering 
persist task concurrently
-    if (Objects.nonNull(progressIndexPersistFile)
-        && !isRegisterPersistTask.getAndSet(true)
+    if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
         && this.persistProgressIndexFuture == null
-        && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
+        && !isRegisterPersistTask.getAndSet(true)) {
       this.persistProgressIndexFuture =
           PipePeriodicalJobExecutor.submitBackgroundJob(
-              () -> {
-                if 
(PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
-                  persistProgressIndex();
-                }
-              },
+              this::persistProgressIndex,
               0,
               PipeConfig.getInstance().getPipeProgressIndexFlushIntervalMs());
     }
 
     progressIndex.updateAndGet(
         index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
-    if (Objects.nonNull(progressIndexPersistFile)
-        && updateCount.incrementAndGet() - lastPersistCount.get() > 
checkPointGap
-        && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
+
+    if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
+        && updateCount.incrementAndGet() - lastPersistCount.get() > 
checkPointGap) {
       persistProgressIndex();
     }
+
     return progressIndex.get();
   }
 
-  private synchronized void persistProgressIndex() {
-    if (lastPersistCount.get() == updateCount.get()) {
-      // in case of multiple threads calling updateProgressIndex at the same 
time
+  public synchronized void persistProgressIndex() {
+    if (Objects.isNull(progressIndexPersistFile)
+        // in case of multiple threads calling updateProgressIndex at the same 
time
+        || lastPersistCount.get() == updateCount.get()) {
       return;
     }
 

Reply via email to