This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-meta-sync-skipper in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9cbfc78ebb70a8732427aa9cab64b3482f87544b Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Mar 21 19:22:26 2024 +0800 Pipe: avoid executing too many PipeMetaSyncProcedure after system reboot --- .../impl/pipe/runtime/PipeMetaSyncProcedure.java | 26 +++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java index ff4b6c335e4..10909902f39 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java @@ -20,11 +20,13 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.runtime; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation; +import org.apache.iotdb.confignode.procedure.state.ProcedureLockState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; @@ -39,15 +41,37 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 { private static final Logger LOGGER = LoggerFactory.getLogger(PipeMetaSyncProcedure.class); + private static final long MIN_EXECUTION_INTERVAL_MS = + PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 << 1; + // No need to serialize this field + private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0); + public PipeMetaSyncProcedure() { super(); } + @Override + protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { + // Skip the procedure if the last execution time is within the minimum execution interval. + // Often used to prevent the procedure from being executed too frequently when system reboot. + if (System.currentTimeMillis() - LAST_EXECUTION_TIME.get() < MIN_EXECUTION_INTERVAL_MS) { + // Skip by setting the pipeTaskInfo to null + pipeTaskInfo = null; + LOGGER.info( + "PipeMetaSyncProcedure: executeFromValidateTask, skip the procedure due to the last execution time {}", + LAST_EXECUTION_TIME.get()); + return ProcedureLockState.LOCK_ACQUIRED; + } + + return super.acquireLock(configNodeProcedureEnv); + } + @Override protected PipeTaskOperation getOperation() { return PipeTaskOperation.SYNC_PIPE_META; @@ -57,7 +81,7 @@ public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 { public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) { LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask"); - // Do nothing + LAST_EXECUTION_TIME.set(System.currentTimeMillis()); return false; }
