This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch cp-15779 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f316179c08b81334b6e19c34df40f6f19bc16b30 Author: Steve Yurong Su <r...@apache.org> AuthorDate: Thu Jun 19 12:05:09 2025 +0800 Pipe/IoTV2: Persist progress index locally before shutdown to accurate recovery after restart (#15779) (cherry picked from commit e227a53a726e2401de7924df15face54f6a31c95) --- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 24 +++++++++++++++++++++ .../iotdb/db/service/DataNodeShutdownHook.java | 3 +++ .../pipe/agent/task/meta/PipeRuntimeMeta.java | 9 ++++++++ .../commons/pipe/agent/task/meta/PipeTaskMeta.java | 25 ++++++++++------------ 4 files changed, 47 insertions(+), 14 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index ef0d4c6b4d9..cd7cd4ea8fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -817,6 +817,30 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } } + ///////////////////////// Shutdown Logic ///////////////////////// + + public void persistAllProgressIndexLocally() { + if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) { + LOGGER.info( + "Pipe progress index persist disabled. Skipping persist all progress index locally."); + return; + } + if (!tryReadLockWithTimeOut(10)) { + LOGGER.info("Failed to persist all progress index locally because of timeout."); + return; + } + try { + for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { + pipeMeta.getRuntimeMeta().persistProgressIndex(); + } + LOGGER.info("Persist all progress index locally successfully."); + } catch (final Exception e) { + LOGGER.warn("Failed to record all progress index locally, because {}.", e.getMessage(), e); + } finally { + releaseReadLock(); + } + } + ///////////////////////// Pipe Consensus ///////////////////////// public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final int consensusGroupId) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index 13b87f5bb83..6981c862a34 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -29,6 +29,7 @@ import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; +import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; @@ -86,6 +87,8 @@ public class DataNodeShutdownHook extends Thread { triggerSnapshotForAllDataRegion(); } + // Persist progress index before shutdown to accurate recovery after restart + PipeDataNodeAgent.task().persistAllProgressIndexLocally(); // Shutdown pipe progressIndex background service PipePeriodicalJobExecutor.shutdownBackgroundService(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java index 752edae0cf0..402a601e52b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java @@ -140,6 +140,15 @@ public class PipeRuntimeMeta { this.isStoppedByRuntimeException.set(isStoppedByRuntimeException); } + public void persistProgressIndex() { + // Iterate through all the task metas and persist their progress index + for (final PipeTaskMeta taskMeta : consensusGroupId2TaskMetaMap.values()) { + if (taskMeta.getProgressIndex() != null) { + taskMeta.persistProgressIndex(); + } + } + } + public ByteBuffer serialize() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java index 6a4ab25db7e..4a753c0e5bf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java @@ -113,34 +113,31 @@ public class PipeTaskMeta { public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) { // only pipeTaskMeta that need to updateProgressIndex will persist progress index // isRegisterPersistTask is used to avoid multiple threads registering persist task concurrently - if (Objects.nonNull(progressIndexPersistFile) - && !isRegisterPersistTask.getAndSet(true) + if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled() && this.persistProgressIndexFuture == null - && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) { + && !isRegisterPersistTask.getAndSet(true)) { this.persistProgressIndexFuture = PipePeriodicalJobExecutor.submitBackgroundJob( - () -> { - if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) { - persistProgressIndex(); - } - }, + this::persistProgressIndex, 0, PipeConfig.getInstance().getPipeProgressIndexFlushIntervalMs()); } progressIndex.updateAndGet( index -> index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex)); - if (Objects.nonNull(progressIndexPersistFile) - && updateCount.incrementAndGet() - lastPersistCount.get() > checkPointGap - && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) { + + if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled() + && updateCount.incrementAndGet() - lastPersistCount.get() > checkPointGap) { persistProgressIndex(); } + return progressIndex.get(); } - private synchronized void persistProgressIndex() { - if (lastPersistCount.get() == updateCount.get()) { - // in case of multiple threads calling updateProgressIndex at the same time + public synchronized void persistProgressIndex() { + if (Objects.isNull(progressIndexPersistFile) + // in case of multiple threads calling updateProgressIndex at the same time + || lastPersistCount.get() == updateCount.get()) { return; }