This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 23ce17a26e9 Pipe/IoTV2: Persist progress index locally before shutdown
to accurate recovery after restart (#15779) (#15887)
23ce17a26e9 is described below
commit 23ce17a26e9affdf194be5317d8fd65853bb0c9b
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jul 9 10:42:36 2025 +0800
Pipe/IoTV2: Persist progress index locally before shutdown to accurate
recovery after restart (#15779) (#15887)
(cherry picked from commit e227a53a726e2401de7924df15face54f6a31c95)
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 24 +++++++++++++++++++++
.../iotdb/db/service/DataNodeShutdownHook.java | 3 +++
.../pipe/agent/task/meta/PipeRuntimeMeta.java | 9 ++++++++
.../commons/pipe/agent/task/meta/PipeTaskMeta.java | 25 ++++++++++------------
4 files changed, 47 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index ef0d4c6b4d9..cd7cd4ea8fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -817,6 +817,30 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
}
+ ///////////////////////// Shutdown Logic /////////////////////////
+
+ public void persistAllProgressIndexLocally() {
+ if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
+ LOGGER.info(
+ "Pipe progress index persist disabled. Skipping persist all progress
index locally.");
+ return;
+ }
+ if (!tryReadLockWithTimeOut(10)) {
+ LOGGER.info("Failed to persist all progress index locally because of
timeout.");
+ return;
+ }
+ try {
+ for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+ pipeMeta.getRuntimeMeta().persistProgressIndex();
+ }
+ LOGGER.info("Persist all progress index locally successfully.");
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to record all progress index locally, because {}.",
e.getMessage(), e);
+ } finally {
+ releaseReadLock();
+ }
+ }
+
///////////////////////// Pipe Consensus /////////////////////////
public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final
int consensusGroupId) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 13b87f5bb83..6981c862a34 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -86,6 +87,8 @@ public class DataNodeShutdownHook extends Thread {
triggerSnapshotForAllDataRegion();
}
+ // Persist progress index before shutdown to accurate recovery after
restart
+ PipeDataNodeAgent.task().persistAllProgressIndexLocally();
// Shutdown pipe progressIndex background service
PipePeriodicalJobExecutor.shutdownBackgroundService();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
index 752edae0cf0..402a601e52b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
@@ -140,6 +140,15 @@ public class PipeRuntimeMeta {
this.isStoppedByRuntimeException.set(isStoppedByRuntimeException);
}
+ public void persistProgressIndex() {
+ // Iterate through all the task metas and persist their progress index
+ for (final PipeTaskMeta taskMeta : consensusGroupId2TaskMetaMap.values()) {
+ if (taskMeta.getProgressIndex() != null) {
+ taskMeta.persistProgressIndex();
+ }
+ }
+ }
+
public ByteBuffer serialize() throws IOException {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index 6a4ab25db7e..4a753c0e5bf 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -113,34 +113,31 @@ public class PipeTaskMeta {
public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
// only pipeTaskMeta that need to updateProgressIndex will persist
progress index
// isRegisterPersistTask is used to avoid multiple threads registering
persist task concurrently
- if (Objects.nonNull(progressIndexPersistFile)
- && !isRegisterPersistTask.getAndSet(true)
+ if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
&& this.persistProgressIndexFuture == null
- && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
+ && !isRegisterPersistTask.getAndSet(true)) {
this.persistProgressIndexFuture =
PipePeriodicalJobExecutor.submitBackgroundJob(
- () -> {
- if
(PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
- persistProgressIndex();
- }
- },
+ this::persistProgressIndex,
0,
PipeConfig.getInstance().getPipeProgressIndexFlushIntervalMs());
}
progressIndex.updateAndGet(
index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
- if (Objects.nonNull(progressIndexPersistFile)
- && updateCount.incrementAndGet() - lastPersistCount.get() >
checkPointGap
- && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
+
+ if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
+ && updateCount.incrementAndGet() - lastPersistCount.get() >
checkPointGap) {
persistProgressIndex();
}
+
return progressIndex.get();
}
- private synchronized void persistProgressIndex() {
- if (lastPersistCount.get() == updateCount.get()) {
- // in case of multiple threads calling updateProgressIndex at the same
time
+ public synchronized void persistProgressIndex() {
+ if (Objects.isNull(progressIndexPersistFile)
+ // in case of multiple threads calling updateProgressIndex at the same
time
+ || lastPersistCount.get() == updateCount.get()) {
return;
}