This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch ml_add_peer in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c210da84d15b9c76b861b67d9aeaef0d4ff39ee0 Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Sep 5 13:36:23 2022 +0800 polish codes in multileader consensus --- .../consensus/multileader/MultiLeaderConsensus.java | 16 +++++++++++++--- .../consensus/multileader/MultiLeaderServerImpl.java | 5 +++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java index d320951b0e..0d56ed4971 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java @@ -36,6 +36,7 @@ import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.config.MultiLeaderConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException; import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; @@ -248,28 +249,37 @@ public class MultiLeaderConsensus implements IConsensus { } try { // step 1: inactive new Peer to prepare for following steps + logger.info("inactivate new peer: {}", peer); impl.inactivePeer(peer); // step 2: notify all the other Peers to build the sync connection to newPeer + logger.info("notify current peers to build sync log..."); impl.notifyPeersToBuildSyncLogChannel(peer); // step 3: take snapshot + logger.info("start to take snapshot..."); impl.takeSnapshot(); // step 4: transit snapshot + logger.info("start to transit snapshot..."); impl.transitSnapshot(peer); // step 5: let the new peer load snapshot + logger.info("trigger new peer to load snapshot..."); impl.triggerSnapshotLoad(peer); - // step 5: active new Peer + // step 6: active new Peer + logger.info("activate new peer..."); impl.activePeer(peer); } catch (ConsensusGroupAddPeerException e) { - + return ConsensusGenericResponse.newBuilder() + .setSuccess(false) + .setException(new ConsensusException(e.getMessage())) + .build(); } - return ConsensusGenericResponse.newBuilder().setSuccess(false).build(); + return ConsensusGenericResponse.newBuilder().setSuccess(true).build(); } @Override diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java index 72dec83fc7..b260da253b 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java @@ -370,6 +370,10 @@ public class MultiLeaderServerImpl { // configuration List<Peer> currentMembers = new ArrayList<>(this.configuration); for (Peer peer : currentMembers) { + if (peer.equals(targetPeer)) { + // if the targetPeer is the same as current peer, skip it because removing itself is illegal + continue; + } if (peer.equals(thisNode)) { removeSyncLogChannel(targetPeer); } else { @@ -561,6 +565,7 @@ public class MultiLeaderServerImpl { } public void setActive(boolean active) { + logger.info("set {} active status to {}", this.thisNode, active); this.active = active; } }
