This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 458efdbaa89 Optimize ConfigNode ConsensusManager init logic (#12098)
458efdbaa89 is described below
commit 458efdbaa89e3738f0ddbd0e1fc32d96453b5afb
Author: Potato <[email protected]>
AuthorDate: Thu Feb 29 14:56:36 2024 +0800
Optimize ConfigNode ConsensusManager init logic (#12098)
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../statemachine/ConfigRegionStateMachine.java | 6 ++--
.../iotdb/confignode/manager/ClusterManager.java | 10 ------
.../iotdb/confignode/manager/ConfigManager.java | 1 +
.../manager/consensus/ConsensusManager.java | 40 ++++++++++++----------
.../iotdb/confignode/manager/cq/CQManager.java | 11 ------
.../pipe/coordinator/runtime/PipeMetaSyncer.java | 10 ------
6 files changed, 24 insertions(+), 54 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index b68370f9668..13d801d9be0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -243,11 +243,9 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
// Add Metric after leader ready
configManager.addMetrics();
- // we do cq recovery async for two reasons:
- // 1. For performance: cq recovery may be time-consuming, we use another
thread to do it in
+ // we do cq recovery async for performance:
+ // cq recovery may be time-consuming, we use another thread to do it in
// make notifyLeaderChanged not blocked by it
- // 2. For correctness: in cq recovery processing, it will use
ConsensusManager which may be
- // initialized after notifyLeaderChanged finished
threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
threadPool.submit(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
index 6f77bbc5936..0dc662aa13a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
public class ClusterManager {
@@ -72,15 +71,6 @@ public class ClusterManager {
private void generateClusterId() {
String clusterId = String.valueOf(UUID.randomUUID());
UpdateClusterIdPlan updateClusterIdPlan = new
UpdateClusterIdPlan(clusterId);
- while (configManager.getConsensusManager() == null) {
- try {
- LOGGER.info("consensus layer is not ready, sleep 100ms...");
- TimeUnit.MILLISECONDS.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.warn("Unexpected interruption during waiting for consensus
layer ready.");
- }
- }
try {
configManager.getConsensusManager().write(updateClusterIdPlan);
} catch (ConsensusException e) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 760de526241..6bbcf44adbd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -311,6 +311,7 @@ public class ConfigManager implements IManager {
public void initConsensusManager() throws IOException {
this.consensusManager.set(new ConsensusManager(this, this.stateMachine));
+ this.consensusManager.get().start();
}
public void close() throws IOException {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 69647bf7488..f9ce496ea65 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -78,13 +78,33 @@ public class ConsensusManager {
setConsensusLayer(stateMachine);
}
+ public void start() throws IOException {
+ consensusImpl.start();
+ if (SystemPropertiesUtils.isRestarted()) {
+ LOGGER.info("Init ConsensusManager successfully when restarted");
+ } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
+ // Create ConsensusGroup that contains only itself
+ // if the current ConfigNode is Seed-ConfigNode
+ try {
+ createPeerForConsensusGroup(
+ Collections.singletonList(
+ new TConfigNodeLocation(
+ SEED_CONFIG_NODE_ID,
+ new TEndPoint(CONF.getInternalAddress(),
CONF.getInternalPort()),
+ new TEndPoint(CONF.getInternalAddress(),
CONF.getConsensusPort()))));
+ } catch (ConsensusException e) {
+ LOGGER.error(
+ "Something wrong happened while calling consensus layer's
createLocalPeer API.", e);
+ }
+ }
+ }
+
public void close() throws IOException {
consensusImpl.stop();
}
/** ConsensusLayer local implementation. */
private void setConsensusLayer(ConfigRegionStateMachine stateMachine) throws
IOException {
-
if (SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
upgrade();
consensusImpl =
@@ -211,24 +231,6 @@ public class ConsensusManager {
ConsensusFactory.CONSTRUCT_FAILED_MSG,
CONF.getConfigNodeConsensusProtocolClass())));
}
- consensusImpl.start();
- if (SystemPropertiesUtils.isRestarted()) {
- LOGGER.info("Init ConsensusManager successfully when restarted");
- } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
- // Create ConsensusGroup that contains only itself
- // if the current ConfigNode is Seed-ConfigNode
- try {
- createPeerForConsensusGroup(
- Collections.singletonList(
- new TConfigNodeLocation(
- SEED_CONFIG_NODE_ID,
- new TEndPoint(CONF.getInternalAddress(),
CONF.getInternalPort()),
- new TEndPoint(CONF.getInternalAddress(),
CONF.getConsensusPort()))));
- } catch (ConsensusException e) {
- LOGGER.error(
- "Something wrong happened while calling consensus layer's
createLocalPeer API.", e);
- }
- }
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index 02c6e8df242..ec27ec8cc92 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -134,16 +133,6 @@ public class CQManager {
// 3. get all CQs
List<CQInfo.CQEntry> allCQs = null;
- // wait for consensus layer ready
- while (configManager.getConsensusManager() == null) {
- try {
- LOGGER.info("consensus layer is not ready, sleep 1s...");
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.warn("Unexpected interruption during waiting for consensus
layer ready.");
- }
- }
// keep fetching until we get all CQEntries if this node is still leader
while (needFetch(allCQs)) {
try {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
index c80b1018ca3..7ea6aaf5cc9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
@@ -70,16 +70,6 @@ public class PipeMetaSyncer {
}
public synchronized void start() {
- while (configManager.getConsensusManager() == null) {
- try {
- LOGGER.info("Consensus layer is not ready, sleep 1s...");
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.warn("Unexpected interruption during waiting for consensus
layer ready.");
- }
- }
-
if (metaSyncFuture == null) {
metaSyncFuture =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(