This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-task-schedule in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 86f0e554bb6b3c17f4ddccaafdafa21379dc7b26 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon May 22 19:33:53 2023 +0800 safety start pipe tasks --- .../statemachine/ConfigRegionStateMachine.java | 6 ++++-- .../manager/pipe/runtime/PipeMetaSyncer.java | 21 ++++++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index 53e7f9ae688..4964ba79774 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -64,7 +64,7 @@ public class ConfigRegionStateMachine private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRegionStateMachine.class); private static final ExecutorService threadPool = - IoTDBThreadPoolFactory.newCachedThreadPool("CQ-recovery"); + IoTDBThreadPoolFactory.newCachedThreadPool("ConfigNode-Manager-Recovery"); private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private final ConfigPlanExecutor executor; private ConfigManager configManager; @@ -218,7 +218,9 @@ public class ConfigRegionStateMachine // 2. For correctness: in cq recovery processing, it will use ConsensusManager which may be // initialized after notifyLeaderChanged finished threadPool.submit(() -> configManager.getCQManager().startCQScheduler()); - configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync(); + + threadPool.submit( + () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()); } else { LOGGER.info( "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}]", diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java index ba9bba61c6a..4d6ebb04ff0 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java @@ -40,7 +40,8 @@ public class PipeMetaSyncer { private static final ScheduledExecutorService SYNC_EXECUTOR = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( ThreadName.PIPE_META_SYNC_SERVICE.getName()); - // TODO: make this configurable + // TODO: make these configurable + private static final long INITIAL_SYNC_DELAY_MINUTES = 3; private static final long SYNC_INTERVAL_MINUTES = 3; private final ConfigManager configManager; @@ -52,15 +53,29 @@ public class PipeMetaSyncer { } public synchronized void start() { + while (configManager.getConsensusManager() == null) { + try { + LOGGER.info("consensus layer is not ready, sleep 1s..."); + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("unexpected interruption during waiting for consensus layer ready."); + } + } + if (metaSyncFuture == null) { metaSyncFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay( - SYNC_EXECUTOR, this::sync, 0, SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES); + SYNC_EXECUTOR, + this::sync, + INITIAL_SYNC_DELAY_MINUTES, + SYNC_INTERVAL_MINUTES, + TimeUnit.MINUTES); LOGGER.info("PipeMetaSyncer is started successfully."); } } - private void sync() { + private synchronized void sync() { final TSStatus status = configManager.getProcedureManager().pipeMetaSync(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn(
