This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 63b4668 Add isLeaderOf to Consensus (#5377)
63b4668 is described below
commit 63b46689f8ba7d0babd2e3cf37dbb08fd9f71ddc
Author: SzyWilliam <[email protected]>
AuthorDate: Thu Mar 31 23:31:10 2022 +0800
Add isLeaderOf to Consensus (#5377)
* fix transferLeader bug, add isLeaderOf interface
* fix reviews
---
.../org/apache/iotdb/consensus/IConsensus.java | 2 +
.../iotdb/consensus/ratis/RatisConsensus.java | 62 ++++++++++++++++++----
.../org/apache/iotdb/consensus/ratis/Utils.java | 15 ++++--
.../consensus/standalone/StandAloneConsensus.java | 5 ++
.../iotdb/consensus/ratis/RatisConsensusTest.java | 2 +
5 files changed, 72 insertions(+), 14 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 808d596..81213bb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -56,4 +56,6 @@ public interface IConsensus {
ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer
newLeader);
ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId);
+
+ boolean isLeader(ConsensusGroupId groupId);
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index d45b975..52251ff 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -85,6 +85,9 @@ public class RatisConsensus implements IConsensus {
private ClientId localFakeId;
private AtomicLong localFakeCallId;
+ private static final int DEFAULT_PRIORITY = 0;
+ private static final int LEADER_PRIORITY = 1;
+
private Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
/**
* This function will use the previous client for groupId to query the
latest group info It will
@@ -297,7 +300,7 @@ public class RatisConsensus implements IConsensus {
return failed(e);
}
RaftGroup group = raftGroupMap.get(raftGroupId);
- RaftPeer peerToAdd = Utils.toRaftPeer(peer);
+ RaftPeer peerToAdd = Utils.toRaftPeer(peer, DEFAULT_PRIORITY);
// pre-conditions: group exists and myself in this group
if (group == null || !group.getPeers().contains(myself)) {
@@ -340,7 +343,7 @@ public class RatisConsensus implements IConsensus {
return failed(e);
}
RaftGroup group = raftGroupMap.get(raftGroupId);
- RaftPeer peerToRemove = Utils.toRaftPeer(peer);
+ RaftPeer peerToRemove = Utils.toRaftPeer(peer, DEFAULT_PRIORITY);
// pre-conditions: group exists and myself in this group
if (group == null || !group.getPeers().contains(myself)) {
@@ -396,15 +399,38 @@ public class RatisConsensus implements IConsensus {
@Override
public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId,
Peer newLeader) {
+ // By default, Ratis gives every Raft Peer same priority 0
+ // Ratis does not allow a peer.priority <= currentLeader.priority to
becomes the leader
+ // So we have to enhance to leader's priority
+
+ // first fetch the newest information
+ try {
+ syncGroupInfoAndRebuildClient(groupId);
+ } catch (ConsensusGroupNotExistException e) {
+ return failed(e);
+ }
+
RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
- RaftClient client = clientMap.getOrDefault(raftGroupId, null);
- if (client == null) {
- return failed(new ConsensusGroupNotExistException(groupId));
+ RaftGroup raftGroup = raftGroupMap.get(raftGroupId);
+ RaftClient client = clientMap.get(raftGroupId);
+ RaftPeer newRaftLeader = Utils.toRaftPeer(newLeader, LEADER_PRIORITY);
+
+ ArrayList<RaftPeer> newConfiguration = new ArrayList<>();
+ for (RaftPeer raftPeer : raftGroup.getPeers()) {
+ if (raftPeer.getId().equals(newRaftLeader.getId())) {
+ newConfiguration.add(newRaftLeader);
+ } else {
+ // degrade every other peer to default priority
+ newConfiguration.add(Utils.toRaftPeer(Utils.getEndPoint(raftPeer),
DEFAULT_PRIORITY));
+ }
}
- RaftPeer newRaftLeader = Utils.toRaftPeer(newLeader);
RaftClientReply reply = null;
try {
+ RaftClientReply configChangeReply =
client.admin().setConfiguration(newConfiguration);
+ if (!configChangeReply.isSuccess()) {
+ return failed(new
RatisRequestFailedException(configChangeReply.getException()));
+ }
// TODO tuning for timeoutMs
reply = client.admin().transferLeadership(newRaftLeader.getId(), 2000);
} catch (IOException e) {
@@ -414,6 +440,21 @@ public class RatisConsensus implements IConsensus {
}
@Override
+ public boolean isLeader(ConsensusGroupId groupId) {
+ RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+
+ boolean isLeader;
+ try {
+ isLeader = server.getDivision(raftGroupId).getInfo().isLeader();
+ } catch (IOException exception) {
+ // if the query fails, simply return not leader
+ logger.warn("isLeader request failed with exception: ", exception);
+ isLeader = false;
+ }
+ return isLeader;
+ }
+
+ @Override
public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
}
@@ -443,7 +484,10 @@ public class RatisConsensus implements IConsensus {
}
private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers)
{
- List<RaftPeer> raftPeers =
peers.stream().map(Utils::toRaftPeer).collect(Collectors.toList());
+ List<RaftPeer> raftPeers =
+ peers.stream()
+ .map(peer -> Utils.toRaftPeer(peer, DEFAULT_PRIORITY))
+ .collect(Collectors.toList());
return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
}
@@ -482,8 +526,8 @@ public class RatisConsensus implements IConsensus {
this.localFakeCallId = new AtomicLong(0);
// create a RaftPeer as endpoint of comm
- String address = Utils.IP_PORT(endpoint);
- myself = Utils.toRaftPeer(endpoint);
+ String address = Utils.IPAddress(endpoint);
+ myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
RaftProperties properties = new RaftProperties();
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index 2cb09e2..7e264c6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -42,7 +42,7 @@ public class Utils {
private static final String SchemaRegionAbbr = "SR";
private static final String PartitionRegionAbbr = "PR";
- public static String IP_PORT(Endpoint endpoint) {
+ public static String IPAddress(Endpoint endpoint) {
return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
}
@@ -69,13 +69,18 @@ public class Utils {
return String.format("%s-%d", groupTypeAbbr, consensusGroupId.getId());
}
- public static RaftPeer toRaftPeer(Endpoint endpoint) {
+ // priority is used as ordinal of leader election
+ public static RaftPeer toRaftPeer(Endpoint endpoint, int priority) {
String Id = String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
- return
RaftPeer.newBuilder().setId(Id).setAddress(IP_PORT(endpoint)).build();
+ return RaftPeer.newBuilder()
+ .setId(Id)
+ .setAddress(IPAddress(endpoint))
+ .setPriority(priority)
+ .build();
}
- public static RaftPeer toRaftPeer(Peer peer) {
- return toRaftPeer(peer.getEndpoint());
+ public static RaftPeer toRaftPeer(Peer peer, int priority) {
+ return toRaftPeer(peer.getEndpoint(), priority);
}
public static Endpoint getEndPoint(RaftPeer raftPeer) {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index 3a2e121..d2acbde 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -168,4 +168,9 @@ public class StandAloneConsensus implements IConsensus {
public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
}
+
+ @Override
+ public boolean isLeader(ConsensusGroupId groupId) {
+ return true;
+ }
}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 4213015..edf16f9 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -165,7 +165,9 @@ public class RatisConsensusTest {
doConsensus(servers.get(0), group.getGroupId(), 10, 10);
// 6. Remove two Peers from Group (peer 0 and peer 2)
+ // transfer the leader to peer1
servers.get(0).transferLeader(gid, peer1);
+ Assert.assertTrue(servers.get(1).isLeader(gid));
// first use removePeer to inform the group leader of configuration change
servers.get(1).removePeer(gid, peer0);
servers.get(1).removePeer(gid, peer2);