This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 63b4668  Add isLeaderOf to Consensus (#5377)
63b4668 is described below

commit 63b46689f8ba7d0babd2e3cf37dbb08fd9f71ddc
Author: SzyWilliam <[email protected]>
AuthorDate: Thu Mar 31 23:31:10 2022 +0800

    Add isLeaderOf to Consensus (#5377)
    
    * fix transferLeader bug, add isLeaderOf interface
    
    * fix reviews
---
 .../org/apache/iotdb/consensus/IConsensus.java     |  2 +
 .../iotdb/consensus/ratis/RatisConsensus.java      | 62 ++++++++++++++++++----
 .../org/apache/iotdb/consensus/ratis/Utils.java    | 15 ++++--
 .../consensus/standalone/StandAloneConsensus.java  |  5 ++
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |  2 +
 5 files changed, 72 insertions(+), 14 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 808d596..81213bb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -56,4 +56,6 @@ public interface IConsensus {
   ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer 
newLeader);
 
   ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId);
+
+  boolean isLeader(ConsensusGroupId groupId);
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index d45b975..52251ff 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -85,6 +85,9 @@ public class RatisConsensus implements IConsensus {
   private ClientId localFakeId;
   private AtomicLong localFakeCallId;
 
+  private static final int DEFAULT_PRIORITY = 0;
+  private static final int LEADER_PRIORITY = 1;
+
   private Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
   /**
    * This function will use the previous client for groupId to query the 
latest group info It will
@@ -297,7 +300,7 @@ public class RatisConsensus implements IConsensus {
       return failed(e);
     }
     RaftGroup group = raftGroupMap.get(raftGroupId);
-    RaftPeer peerToAdd = Utils.toRaftPeer(peer);
+    RaftPeer peerToAdd = Utils.toRaftPeer(peer, DEFAULT_PRIORITY);
 
     // pre-conditions: group exists and myself in this group
     if (group == null || !group.getPeers().contains(myself)) {
@@ -340,7 +343,7 @@ public class RatisConsensus implements IConsensus {
       return failed(e);
     }
     RaftGroup group = raftGroupMap.get(raftGroupId);
-    RaftPeer peerToRemove = Utils.toRaftPeer(peer);
+    RaftPeer peerToRemove = Utils.toRaftPeer(peer, DEFAULT_PRIORITY);
 
     // pre-conditions: group exists and myself in this group
     if (group == null || !group.getPeers().contains(myself)) {
@@ -396,15 +399,38 @@ public class RatisConsensus implements IConsensus {
 
   @Override
   public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, 
Peer newLeader) {
+    // By default, Ratis gives every Raft Peer same priority 0
+    // Ratis does not allow a peer.priority <= currentLeader.priority to 
becomes the leader
+    // So we have to enhance to leader's priority
+
+    // first fetch the newest information
+    try {
+      syncGroupInfoAndRebuildClient(groupId);
+    } catch (ConsensusGroupNotExistException e) {
+      return failed(e);
+    }
+
     RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
-    RaftClient client = clientMap.getOrDefault(raftGroupId, null);
-    if (client == null) {
-      return failed(new ConsensusGroupNotExistException(groupId));
+    RaftGroup raftGroup = raftGroupMap.get(raftGroupId);
+    RaftClient client = clientMap.get(raftGroupId);
+    RaftPeer newRaftLeader = Utils.toRaftPeer(newLeader, LEADER_PRIORITY);
+
+    ArrayList<RaftPeer> newConfiguration = new ArrayList<>();
+    for (RaftPeer raftPeer : raftGroup.getPeers()) {
+      if (raftPeer.getId().equals(newRaftLeader.getId())) {
+        newConfiguration.add(newRaftLeader);
+      } else {
+        // degrade every other peer to default priority
+        newConfiguration.add(Utils.toRaftPeer(Utils.getEndPoint(raftPeer), 
DEFAULT_PRIORITY));
+      }
     }
-    RaftPeer newRaftLeader = Utils.toRaftPeer(newLeader);
 
     RaftClientReply reply = null;
     try {
+      RaftClientReply configChangeReply = 
client.admin().setConfiguration(newConfiguration);
+      if (!configChangeReply.isSuccess()) {
+        return failed(new 
RatisRequestFailedException(configChangeReply.getException()));
+      }
       // TODO tuning for timeoutMs
       reply = client.admin().transferLeadership(newRaftLeader.getId(), 2000);
     } catch (IOException e) {
@@ -414,6 +440,21 @@ public class RatisConsensus implements IConsensus {
   }
 
   @Override
+  public boolean isLeader(ConsensusGroupId groupId) {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+
+    boolean isLeader;
+    try {
+      isLeader = server.getDivision(raftGroupId).getInfo().isLeader();
+    } catch (IOException exception) {
+      // if the query fails, simply return not leader
+      logger.warn("isLeader request failed with exception: ", exception);
+      isLeader = false;
+    }
+    return isLeader;
+  }
+
+  @Override
   public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
     return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
   }
@@ -443,7 +484,10 @@ public class RatisConsensus implements IConsensus {
   }
 
   private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) 
{
-    List<RaftPeer> raftPeers = 
peers.stream().map(Utils::toRaftPeer).collect(Collectors.toList());
+    List<RaftPeer> raftPeers =
+        peers.stream()
+            .map(peer -> Utils.toRaftPeer(peer, DEFAULT_PRIORITY))
+            .collect(Collectors.toList());
     return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
   }
 
@@ -482,8 +526,8 @@ public class RatisConsensus implements IConsensus {
     this.localFakeCallId = new AtomicLong(0);
 
     // create a RaftPeer as endpoint of comm
-    String address = Utils.IP_PORT(endpoint);
-    myself = Utils.toRaftPeer(endpoint);
+    String address = Utils.IPAddress(endpoint);
+    myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
 
     RaftProperties properties = new RaftProperties();
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index 2cb09e2..7e264c6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -42,7 +42,7 @@ public class Utils {
   private static final String SchemaRegionAbbr = "SR";
   private static final String PartitionRegionAbbr = "PR";
 
-  public static String IP_PORT(Endpoint endpoint) {
+  public static String IPAddress(Endpoint endpoint) {
     return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
   }
 
@@ -69,13 +69,18 @@ public class Utils {
     return String.format("%s-%d", groupTypeAbbr, consensusGroupId.getId());
   }
 
-  public static RaftPeer toRaftPeer(Endpoint endpoint) {
+  // priority is used as ordinal of leader election
+  public static RaftPeer toRaftPeer(Endpoint endpoint, int priority) {
     String Id = String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
-    return 
RaftPeer.newBuilder().setId(Id).setAddress(IP_PORT(endpoint)).build();
+    return RaftPeer.newBuilder()
+        .setId(Id)
+        .setAddress(IPAddress(endpoint))
+        .setPriority(priority)
+        .build();
   }
 
-  public static RaftPeer toRaftPeer(Peer peer) {
-    return toRaftPeer(peer.getEndpoint());
+  public static RaftPeer toRaftPeer(Peer peer, int priority) {
+    return toRaftPeer(peer.getEndpoint(), priority);
   }
 
   public static Endpoint getEndPoint(RaftPeer raftPeer) {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index 3a2e121..d2acbde 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -168,4 +168,9 @@ public class StandAloneConsensus implements IConsensus {
   public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
     return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
   }
+
+  @Override
+  public boolean isLeader(ConsensusGroupId groupId) {
+    return true;
+  }
 }
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 4213015..edf16f9 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -165,7 +165,9 @@ public class RatisConsensusTest {
     doConsensus(servers.get(0), group.getGroupId(), 10, 10);
 
     // 6. Remove two Peers from Group (peer 0 and peer 2)
+    // transfer the leader to peer1
     servers.get(0).transferLeader(gid, peer1);
+    Assert.assertTrue(servers.get(1).isLeader(gid));
     // first use removePeer to inform the group leader of configuration change
     servers.get(1).removePeer(gid, peer0);
     servers.get(1).removePeer(gid, peer2);

Reply via email to