This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 458efdbaa89 Optimize ConfigNode ConsensusManager init logic (#12098)
458efdbaa89 is described below

commit 458efdbaa89e3738f0ddbd0e1fc32d96453b5afb
Author: Potato <[email protected]>
AuthorDate: Thu Feb 29 14:56:36 2024 +0800

    Optimize ConfigNode ConsensusManager init logic (#12098)
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../statemachine/ConfigRegionStateMachine.java     |  6 ++--
 .../iotdb/confignode/manager/ClusterManager.java   | 10 ------
 .../iotdb/confignode/manager/ConfigManager.java    |  1 +
 .../manager/consensus/ConsensusManager.java        | 40 ++++++++++++----------
 .../iotdb/confignode/manager/cq/CQManager.java     | 11 ------
 .../pipe/coordinator/runtime/PipeMetaSyncer.java   | 10 ------
 6 files changed, 24 insertions(+), 54 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index b68370f9668..13d801d9be0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -243,11 +243,9 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
     // Add Metric after leader ready
     configManager.addMetrics();
 
-    // we do cq recovery async for two reasons:
-    // 1. For performance: cq recovery may be time-consuming, we use another 
thread to do it in
+    // we do cq recovery async for performance:
+    // cq recovery may be time-consuming, we use another thread to do it in
     // make notifyLeaderChanged not blocked by it
-    // 2. For correctness: in cq recovery processing, it will use 
ConsensusManager which may be
-    // initialized after notifyLeaderChanged finished
     threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
 
     threadPool.submit(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
index 6f77bbc5936..0dc662aa13a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 public class ClusterManager {
 
@@ -72,15 +71,6 @@ public class ClusterManager {
   private void generateClusterId() {
     String clusterId = String.valueOf(UUID.randomUUID());
     UpdateClusterIdPlan updateClusterIdPlan = new 
UpdateClusterIdPlan(clusterId);
-    while (configManager.getConsensusManager() == null) {
-      try {
-        LOGGER.info("consensus layer is not ready, sleep 100ms...");
-        TimeUnit.MILLISECONDS.sleep(100);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOGGER.warn("Unexpected interruption during waiting for consensus 
layer ready.");
-      }
-    }
     try {
       configManager.getConsensusManager().write(updateClusterIdPlan);
     } catch (ConsensusException e) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 760de526241..6bbcf44adbd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -311,6 +311,7 @@ public class ConfigManager implements IManager {
 
   public void initConsensusManager() throws IOException {
     this.consensusManager.set(new ConsensusManager(this, this.stateMachine));
+    this.consensusManager.get().start();
   }
 
   public void close() throws IOException {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 69647bf7488..f9ce496ea65 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -78,13 +78,33 @@ public class ConsensusManager {
     setConsensusLayer(stateMachine);
   }
 
+  public void start() throws IOException {
+    consensusImpl.start();
+    if (SystemPropertiesUtils.isRestarted()) {
+      LOGGER.info("Init ConsensusManager successfully when restarted");
+    } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
+      // Create ConsensusGroup that contains only itself
+      // if the current ConfigNode is Seed-ConfigNode
+      try {
+        createPeerForConsensusGroup(
+            Collections.singletonList(
+                new TConfigNodeLocation(
+                    SEED_CONFIG_NODE_ID,
+                    new TEndPoint(CONF.getInternalAddress(), 
CONF.getInternalPort()),
+                    new TEndPoint(CONF.getInternalAddress(), 
CONF.getConsensusPort()))));
+      } catch (ConsensusException e) {
+        LOGGER.error(
+            "Something wrong happened while calling consensus layer's 
createLocalPeer API.", e);
+      }
+    }
+  }
+
   public void close() throws IOException {
     consensusImpl.stop();
   }
 
   /** ConsensusLayer local implementation. */
   private void setConsensusLayer(ConfigRegionStateMachine stateMachine) throws 
IOException {
-
     if (SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
       upgrade();
       consensusImpl =
@@ -211,24 +231,6 @@ public class ConsensusManager {
                               ConsensusFactory.CONSTRUCT_FAILED_MSG,
                               CONF.getConfigNodeConsensusProtocolClass())));
     }
-    consensusImpl.start();
-    if (SystemPropertiesUtils.isRestarted()) {
-      LOGGER.info("Init ConsensusManager successfully when restarted");
-    } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
-      // Create ConsensusGroup that contains only itself
-      // if the current ConfigNode is Seed-ConfigNode
-      try {
-        createPeerForConsensusGroup(
-            Collections.singletonList(
-                new TConfigNodeLocation(
-                    SEED_CONFIG_NODE_ID,
-                    new TEndPoint(CONF.getInternalAddress(), 
CONF.getInternalPort()),
-                    new TEndPoint(CONF.getInternalAddress(), 
CONF.getConsensusPort()))));
-      } catch (ConsensusException e) {
-        LOGGER.error(
-            "Something wrong happened while calling consensus layer's 
createLocalPeer API.", e);
-      }
-    }
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index 02c6e8df242..ec27ec8cc92 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -134,16 +133,6 @@ public class CQManager {
 
       // 3. get all CQs
       List<CQInfo.CQEntry> allCQs = null;
-      // wait for consensus layer ready
-      while (configManager.getConsensusManager() == null) {
-        try {
-          LOGGER.info("consensus layer is not ready, sleep 1s...");
-          TimeUnit.SECONDS.sleep(1);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          LOGGER.warn("Unexpected interruption during waiting for consensus 
layer ready.");
-        }
-      }
       // keep fetching until we get all CQEntries if this node is still leader
       while (needFetch(allCQs)) {
         try {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
index c80b1018ca3..7ea6aaf5cc9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
@@ -70,16 +70,6 @@ public class PipeMetaSyncer {
   }
 
   public synchronized void start() {
-    while (configManager.getConsensusManager() == null) {
-      try {
-        LOGGER.info("Consensus layer is not ready, sleep 1s...");
-        TimeUnit.SECONDS.sleep(1);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOGGER.warn("Unexpected interruption during waiting for consensus 
layer ready.");
-      }
-    }
-
     if (metaSyncFuture == null) {
       metaSyncFuture =
           ScheduledExecutorUtil.safelyScheduleWithFixedDelay(

Reply via email to