This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 052e109  RATIS-1034. Change leader heartbeat to yield to a higher 
priority leader (#184)
052e109 is described below

commit 052e109bf33bedaf6754c6a26f55c2eb1d9dbae9
Author: runzhiwang <[email protected]>
AuthorDate: Mon Aug 31 12:52:19 2020 +0800

    RATIS-1034. Change leader heartbeat to yield to a higher priority leader 
(#184)
---
 .../apache/ratis/server/impl/LeaderElection.java   | 10 +---
 .../org/apache/ratis/server/impl/LeaderState.java  | 59 ++++++++++++++++++++++
 .../apache/ratis/server/impl/RaftServerImpl.java   |  2 +-
 .../org/apache/ratis/server/impl/ServerState.java  | 26 ++++++----
 .../ratis/server/impl/GroupManagementBaseTest.java | 16 ++++++
 5 files changed, 94 insertions(+), 19 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index a85c1a5..27e5a28 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -22,7 +22,6 @@ import 
org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.statemachine.SnapshotInfo;
 import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.LifeCycle;
@@ -210,14 +209,7 @@ class LeaderElection implements Runnable {
       }
       LOG.info("{}: begin an election at term {} for {}", this, electionTerm, 
conf);
 
-      TermIndex lastEntry = state.getLog().getLastEntryTermIndex();
-      if (lastEntry == null) {
-        // lastEntry may need to be derived from snapshot
-        SnapshotInfo snapshot = state.getLatestSnapshot();
-        if (snapshot != null) {
-          lastEntry = snapshot.getTermIndex();
-        }
-      }
+      TermIndex lastEntry = state.getLastEntry();
 
       final ResultAndTerm r;
       final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 14ad87a..11631e5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -485,6 +485,18 @@ public class LeaderState {
     }
   }
 
+  private synchronized void stepDown(long term, TermIndex lastEntry) {
+    ServerState state = server.getState();
+    TermIndex currLastEntry = state.getLastEntry();
+    if (ServerState.compareLog(currLastEntry, lastEntry) != 0) {
+      LOG.warn("{} can not stepDown because currLastEntry:{} did not match 
lastEntry:{}",
+          this, currLastEntry, lastEntry);
+      return;
+    }
+
+    stepDown(term);
+  }
+
   private void prepare() {
     synchronized (server) {
       if (running) {
@@ -517,6 +529,7 @@ public class LeaderState {
             } else if (inStagingState()) {
               checkStaging();
             } else {
+              yieldLeaderToHigherPriorityPeer();
               checkLeadership();
             }
           }
@@ -794,12 +807,58 @@ public class LeaderState {
     return lists;
   }
 
+  private void yieldLeaderToHigherPriorityPeer() {
+    if (!server.getRole().isLeader()) {
+      return;
+    }
+
+    final RaftConfiguration conf = server.getRaftConf();
+    int leaderPriority = conf.getPeer(server.getId()).getPriority();
+
+    TermIndex leaderLastEntry = server.getState().getLastEntry();
+
+    for (LogAppender logAppender : senders.getSenders()) {
+      FollowerInfo followerInfo = logAppender.getFollower();
+      RaftPeerId followerID = followerInfo.getPeer().getId();
+      int followerPriority = conf.getPeer(followerID).getPriority();
+
+      if (followerPriority <= leaderPriority) {
+        continue;
+      }
+
+      if (leaderLastEntry == null) {
+        LOG.info("{} stepDown leadership on term:{} because follower's 
priority:{} is higher than leader's:{} " +
+                "and leader's lastEntry is null",
+            this, currentTerm, followerPriority, leaderPriority);
+
+        // step down as follower
+        stepDown(currentTerm, server.getState().getLastEntry());
+        return;
+      }
+
+      if (followerInfo.getMatchIndex() >= leaderLastEntry.getIndex()) {
+        LOG.info("{} stepDown leadership on term:{} because follower's 
priority:{} is higher than leader's:{} " +
+                "and follower's lastEntry index:{} catch up with leader's:{}",
+            this, currentTerm, followerPriority, leaderPriority, 
followerInfo.getMatchIndex(),
+            leaderLastEntry.getIndex());
+
+        // step down as follower
+        stepDown(currentTerm, server.getState().getLastEntry());
+        return;
+      }
+    }
+  }
+
   /**
    * See the thesis section 6.2: A leader in Raft steps down
    * if an election timeout elapses without a successful
    * round of heartbeats to a majority of its cluster.
    */
   private void checkLeadership() {
+    if (!server.getRole().isLeader()) {
+      return;
+    }
+
     // The initial value of lastRpcResponseTime in FollowerInfo is set by
     // LeaderState::addSenders(), which is fake and used to trigger an
     // immediate round of AppendEntries request. Since candidates collect
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f72dde1..79436d0 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -880,7 +880,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
         // see Section 5.4.1 Election restriction
         RaftPeer candidate = getRaftConf().getPeer(candidateId);
         if (fs != null && candidate != null) {
-          int compare = state.compareLog(candidateLastEntry);
+          int compare = ServerState.compareLog(state.getLastEntry(), 
candidateLastEntry);
           int priority = getRaftConf().getPeer(getId()).getPriority();
           LOG.info("{} priority:{} candidate:{} candidatePriority:{} 
compare:{}",
               this, priority, candidate, candidate.getPriority(), compare);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 1213d14..3011fbe 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -278,6 +278,19 @@ public class ServerState implements Closeable {
     return log;
   }
 
+  public TermIndex getLastEntry() {
+    TermIndex lastEntry = getLog().getLastEntryTermIndex();
+    if (lastEntry == null) {
+      // lastEntry may need to be derived from snapshot
+      SnapshotInfo snapshot = getLatestSnapshot();
+      if (snapshot != null) {
+        lastEntry = snapshot.getTermIndex();
+      }
+    }
+
+    return lastEntry;
+  }
+
   void appendLog(TransactionContext operation) throws StateMachineException {
     log.append(currentTerm.get(), operation);
     Objects.requireNonNull(operation.getLogEntry());
@@ -318,11 +331,8 @@ public class ServerState implements Closeable {
     return false;
   }
 
-  int compareLog(TermIndex candidateLastEntry) {
-    TermIndex local = log.getLastEntryTermIndex();
-    // need to take into account snapshot
-    SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
-    if (local == null && snapshot == null) {
+  static int compareLog(TermIndex lastEntry, TermIndex candidateLastEntry) {
+    if (lastEntry == null) {
       // If the lastEntry of candidate is null, the proto will transfer an 
empty TermIndexProto,
       // then term and index of candidateLastEntry will both be 0.
       // Besides, candidateLastEntry comes from proto now, it never be null.
@@ -336,10 +346,8 @@ public class ServerState implements Closeable {
     } else if (candidateLastEntry == null) {
       return 1;
     }
-    if (local == null || (snapshot != null && snapshot.getIndex() > 
local.getIndex())) {
-      local = snapshot.getTermIndex();
-    }
-    return local.compareTo(candidateLastEntry);
+
+    return lastEntry.compareTo(candidateLastEntry);
   }
 
   @Override
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 903f903..fa9aca7 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -136,10 +136,26 @@ public abstract class GroupManagementBaseTest extends 
BaseTest {
       Assert.assertTrue(leader.getId() != 
peers.get(suggestedLeaderIndex).getId());
     }, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), 
"testMultiGroupWithPriority", LOG);
 
+    // send request so that suggested leader's log lag behind new leader's,
+    // when suggested leader rejoin cluster, it will catch up log first.
+    try (final RaftClient client = cluster.createClient(newGroup)) {
+      for (int i = 0; i < 10; i ++) {
+        RaftClientReply reply = client.send(new RaftTestUtil.SimpleMessage("m" 
+ i));
+        Assert.assertTrue(reply.isSuccess());
+      }
+    }
+
     
BlockRequestHandlingInjection.getInstance().unblockRequestor(suggestedLeader);
     
BlockRequestHandlingInjection.getInstance().unblockReplier(suggestedLeader);
     cluster.setBlockRequestsFrom(suggestedLeader, false);
 
+    // suggested leader with highest priority rejoin cluster, then current 
leader will yield
+    // leadership to suggested leader when suggested leader catch up the log.
+    JavaUtils.attempt(() -> {
+      RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster, 
newGroup.getGroupId());
+      Assert.assertTrue(leader.getId() == 
peers.get(suggestedLeaderIndex).getId());
+    }, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), 
"testMultiGroupWithPriority", LOG);
+
     cluster.shutdown();
   }
 

Reply via email to