This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5904 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 224e65e3e382abf98bb248297c089dde9841a3cb Author: Steve Yurong Su <[email protected]> AuthorDate: Sun May 21 19:09:00 2023 +0800 better way to start PipeMetaSyncer --- .../manager/pipe/runtime/PipeMetaSyncer.java | 47 +++++++++++++++++++--- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java index ba9bba61c6a..df080857e9a 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java @@ -37,11 +37,10 @@ public class PipeMetaSyncer { private static final Logger LOGGER = LoggerFactory.getLogger(PipeMetaSyncer.class); - private static final ScheduledExecutorService SYNC_EXECUTOR = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( - ThreadName.PIPE_META_SYNC_SERVICE.getName()); // TODO: make this configurable + private static final long INITIAL_SYNC_DELAY_MINUTES = 1; private static final long SYNC_INTERVAL_MINUTES = 3; + private static ScheduledExecutorService syncExecutor; private final ConfigManager configManager; @@ -52,11 +51,37 @@ public class PipeMetaSyncer { } public synchronized void start() { + stop(); + + // 1. wait for consensus layer ready + while (configManager.getConsensusManager() == null) { + try { + LOGGER.info("consensus layer is not ready, sleep 1s..."); + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("unexpected interruption during waiting for consensus layer ready."); + } + } + + // 2. start sync executor + if (syncExecutor == null) { + syncExecutor = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.PIPE_META_SYNC_SERVICE.getName()); + LOGGER.info("syncExecutor is started successfully."); + } + + // 3. start meta sync task if (metaSyncFuture == null) { metaSyncFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay( - SYNC_EXECUTOR, this::sync, 0, SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES); - LOGGER.info("PipeMetaSyncer is started successfully."); + syncExecutor, + this::sync, + INITIAL_SYNC_DELAY_MINUTES, + SYNC_INTERVAL_MINUTES, + TimeUnit.MINUTES); + LOGGER.info("metaSyncFuture is submitted successfully."); } } @@ -74,7 +99,17 @@ public class PipeMetaSyncer { if (metaSyncFuture != null) { metaSyncFuture.cancel(false); metaSyncFuture = null; - LOGGER.info("PipeMetaSyncer is stopped successfully."); + LOGGER.info("metaSyncFuture is cancelled successfully."); + } + + try { + if (syncExecutor != null) { + syncExecutor.shutdown(); + syncExecutor = null; + LOGGER.info("syncExecutor is shutdown successfully."); + } + } catch (Throwable t) { + LOGGER.error("Failed to shutdown syncExecutor", t); } } }
