This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch init_simple_consensus in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2fde479d10f383a8d5960838c7b75bbd90f1ac61 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Fri Mar 8 16:19:16 2024 +0800 finish Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../org/apache/iotdb/consensus/simple/SimpleConsensus.java | 10 ++++++---- .../iotdb/consensus/simple/SimpleConsensusServerImpl.java | 12 ++++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java index 59f6cccd6f8..c7d6141ceba 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java @@ -168,11 +168,13 @@ class SimpleConsensus implements IConsensus { return null; } - SimpleConsensusServerImpl impl = - new SimpleConsensusServerImpl(peers.get(0), registry.apply(groupId)); - impl.start(); - return impl; + return new SimpleConsensusServerImpl(peers.get(0), registry.apply(groupId)); })) + .map( + impl -> { + impl.start(); + return impl; + }) .orElseThrow( () -> new ConsensusException( diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java index 758d4f31081..22b74b74538 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java @@ -26,11 +26,13 @@ import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; import java.io.File; +import java.util.concurrent.atomic.AtomicBoolean; public class SimpleConsensusServerImpl implements IStateMachine { private final Peer peer; private final IStateMachine stateMachine; + private final AtomicBoolean initialized = new AtomicBoolean(false); public SimpleConsensusServerImpl(Peer peer, IStateMachine stateMachine) { this.peer = peer; @@ -47,10 +49,12 @@ public class SimpleConsensusServerImpl implements IStateMachine { @Override public void start() { - stateMachine.start(); - // Notify itself as the leader - stateMachine.event().notifyLeaderChanged(peer.getGroupId(), peer.getNodeId()); - stateMachine.event().notifyLeaderReady(); + if (initialized.compareAndSet(false, true)) { + stateMachine.start(); + // Notify itself as the leader + stateMachine.event().notifyLeaderChanged(peer.getGroupId(), peer.getNodeId()); + stateMachine.event().notifyLeaderReady(); + } } @Override
