This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new de0e022  RATIS-1033. Change leader election to add priority 
consideration (#182)
de0e022 is described below

commit de0e0223a8cf1344126b6edc5bd6286b1cff72c7
Author: runzhiwang <[email protected]>
AuthorDate: Thu Aug 27 13:59:49 2020 +0800

    RATIS-1033. Change leader election to add priority consideration (#182)
---
 .../apache/ratis/server/impl/LeaderElection.java   | 42 ++++++++++++-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 16 ++++-
 .../org/apache/ratis/server/impl/ServerState.java  | 19 ++++--
 .../ratis/server/impl/GroupManagementBaseTest.java | 69 ++++++++++++++++++++++
 4 files changed, 136 insertions(+), 10 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index e0ab955..a85c1a5 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -46,6 +46,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.ratis.util.LifeCycle.State.NEW;
@@ -269,6 +271,21 @@ class LeaderElection implements Runnable {
     return submitted;
   }
 
+  private Set<RaftPeerId> getHigherPriorityPeers(RaftConfiguration conf) {
+    Set<RaftPeerId> higherPriorityPeers = new HashSet<>();
+
+    int currPriority = conf.getPeer(server.getId()).getPriority();
+    Collection<RaftPeer> peers = conf.getPeers();
+
+    for (RaftPeer peer : peers) {
+      if (peer.getPriority() > currPriority) {
+        higherPriorityPeers.add(peer.getId());
+      }
+    }
+
+    return higherPriorityPeers;
+  }
+
   private ResultAndTerm waitForResults(final long electionTerm, final int 
submitted,
       RaftConfiguration conf, Executor voteExecutor) throws 
InterruptedException {
     final Timestamp timeout = 
Timestamp.currentTime().addTimeMs(server.getRandomTimeoutMs());
@@ -276,10 +293,18 @@ class LeaderElection implements Runnable {
     final List<Exception> exceptions = new ArrayList<>();
     int waitForNum = submitted;
     Collection<RaftPeerId> votedPeers = new ArrayList<>();
+    Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
+
     while (waitForNum > 0 && shouldRun(electionTerm)) {
       final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
       if (waitTime.isNonPositive()) {
-        return logAndReturn(Result.TIMEOUT, responses, exceptions, -1);
+        if (conf.hasMajority(votedPeers, server.getId())) {
+          // if some higher priority peer did not response when timeout, but 
candidate get majority,
+          // candidate pass vote
+          return logAndReturn(Result.PASSED, responses, exceptions, -1);
+        } else {
+          return logAndReturn(Result.TIMEOUT, responses, exceptions, -1);
+        }
       }
 
       try {
@@ -305,9 +330,22 @@ class LeaderElection implements Runnable {
           return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses,
               exceptions, r.getTerm());
         }
+
+        // If any peer with higher priority rejects vote, candidate can not 
pass vote
+        if (!r.getServerReply().getSuccess() && 
higherPriorityPeers.contains(replierId)) {
+          return logAndReturn(Result.REJECTED, responses, exceptions, -1);
+        }
+
+        // remove higher priority peer, so that we check higherPriorityPeers 
empty to make sure
+        // all higher priority peers have replied
+        if (higherPriorityPeers.contains(replierId)) {
+          higherPriorityPeers.remove(replierId);
+        }
+
         if (r.getServerReply().getSuccess()) {
           votedPeers.add(replierId);
-          if (conf.hasMajority(votedPeers, server.getId())) {
+          // If majority and all peers with higher priority have voted, 
candidate pass vote
+          if (higherPriorityPeers.size() == 0 && conf.hasMajority(votedPeers, 
server.getId())) {
             return logAndReturn(Result.PASSED, responses, exceptions, -1);
           }
         }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 1af39c2..f72dde1 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -878,9 +878,19 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
         final boolean termUpdated = changeToFollower(candidateTerm, true, 
"recognizeCandidate:" + candidateId);
         // see Section 5.4.1 Election restriction
-        if (state.isLogUpToDate(candidateLastEntry) && fs != null) {
-          state.grantVote(candidateId);
-          voteGranted = true;
+        RaftPeer candidate = getRaftConf().getPeer(candidateId);
+        if (fs != null && candidate != null) {
+          int compare = state.compareLog(candidateLastEntry);
+          int priority = getRaftConf().getPeer(getId()).getPriority();
+          LOG.info("{} priority:{} candidate:{} candidatePriority:{} 
compare:{}",
+              this, priority, candidate, candidate.getPriority(), compare);
+          // vote for candidate if:
+          // 1. log lags behind candidate
+          // 2. log equals candidate's, and priority less or equal candidate's
+          if (compare < 0 || (compare == 0 && priority <= 
candidate.getPriority())) {
+            state.grantVote(candidateId);
+            voteGranted = true;
+          }
         }
         if (termUpdated || voteGranted) {
           state.persistMetadata(); // sync metafile
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 380bcf2..1213d14 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -318,19 +318,28 @@ public class ServerState implements Closeable {
     return false;
   }
 
-  boolean isLogUpToDate(TermIndex candidateLastEntry) {
+  int compareLog(TermIndex candidateLastEntry) {
     TermIndex local = log.getLastEntryTermIndex();
     // need to take into account snapshot
     SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
-     if (local == null && snapshot == null) {
-      return true;
+    if (local == null && snapshot == null) {
+      // If the lastEntry of candidate is null, the proto will transfer an 
empty TermIndexProto,
+      // then term and index of candidateLastEntry will both be 0.
+      // Besides, candidateLastEntry comes from proto now, it never be null.
+      // But we still check candidateLastEntry == null here,
+      // to avoid candidateLastEntry did not come from proto in future.
+      if (candidateLastEntry == null ||
+          (candidateLastEntry.getTerm() == 0 && candidateLastEntry.getIndex() 
== 0)) {
+        return 0;
+      }
+      return -1;
     } else if (candidateLastEntry == null) {
-      return false;
+      return 1;
     }
     if (local == null || (snapshot != null && snapshot.getIndex() > 
local.getIndex())) {
       local = snapshot.getTermIndex();
     }
-    return local.compareTo(candidateLastEntry) <= 0;
+    return local.compareTo(candidateLastEntry);
   }
 
   @Override
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 3e5f3a3..903f903 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -43,12 +43,14 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.Random;
 import java.util.stream.Collectors;
 
 public abstract class GroupManagementBaseTest extends BaseTest {
@@ -75,6 +77,73 @@ public abstract class GroupManagementBaseTest extends 
BaseTest {
   }
 
   @Test
+  public void testGroupWithPriority() throws Exception {
+    final MiniRaftCluster cluster = getCluster(0);
+    LOG.info("Start testMultiGroup" + cluster.printServers());
+
+    // Start server with null group
+    final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 
0))
+        .map(RaftPeerId::valueOf).collect(Collectors.toList());
+    ids.forEach(id -> cluster.putNewServer(id, null, true));
+    LOG.info("putNewServer: " + cluster.printServers());
+
+    cluster.start();
+
+    // Make sure that there are no leaders.
+    TimeUnit.SECONDS.sleep(1);
+    LOG.info("start: " + cluster.printServers());
+    Assert.assertNull(cluster.getLeader());
+
+    // Add groups
+    List<RaftPeer> peers = cluster.getPeers();
+    Random r = new Random(1);
+    int suggestedLeaderIndex = r.nextInt(peers.size());
+
+    List<RaftPeer> peersWithPriority = new ArrayList<>();
+    for (int i = 0; i < peers.size(); i++) {
+      RaftPeer peer = peers.get(i);
+      if (i == suggestedLeaderIndex) {
+        peersWithPriority.add(new RaftPeer(peer.getId(), peer.getAddress(), 
2));
+      } else {
+          peersWithPriority.add(new RaftPeer(peer.getId(), peer.getAddress(), 
1));
+      }
+    }
+
+    final RaftGroup newGroup = RaftGroup.valueOf(RaftGroupId.randomId(), 
peersWithPriority);
+    LOG.info("add new group: " + newGroup);
+    try (final RaftClient client = cluster.createClient(newGroup)) {
+      for (RaftPeer p : newGroup.getPeers()) {
+        client.groupAdd(newGroup, p.getId());
+      }
+    }
+
+    JavaUtils.attempt(() -> {
+      RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster, 
newGroup.getGroupId());
+      Assert.assertTrue(leader.getId() == 
peers.get(suggestedLeaderIndex).getId());
+    }, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), 
"testMultiGroupWithPriority", LOG);
+
+    String suggestedLeader = 
peers.get(suggestedLeaderIndex).getId().toString();
+
+    // isolate leader, then follower will trigger leader election.
+    // Because leader was isolated, so leader can not vote, and candidate wait 
timeout,
+    // then if candidate get majority, candidate can pass vote
+    
BlockRequestHandlingInjection.getInstance().blockRequestor(suggestedLeader);
+    BlockRequestHandlingInjection.getInstance().blockReplier(suggestedLeader);
+    cluster.setBlockRequestsFrom(suggestedLeader, true);
+
+    JavaUtils.attempt(() -> {
+      RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster, 
newGroup.getGroupId());
+      Assert.assertTrue(leader.getId() != 
peers.get(suggestedLeaderIndex).getId());
+    }, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), 
"testMultiGroupWithPriority", LOG);
+
+    
BlockRequestHandlingInjection.getInstance().unblockRequestor(suggestedLeader);
+    
BlockRequestHandlingInjection.getInstance().unblockReplier(suggestedLeader);
+    cluster.setBlockRequestsFrom(suggestedLeader, false);
+
+    cluster.shutdown();
+  }
+
+  @Test
   public void testSingleGroupRestart() throws Exception {
     final MiniRaftCluster cluster = getCluster(0);
     LOG.info("Start testMultiGroup" + cluster.printServers());

Reply via email to