This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_tmp in repository https://gitbox.apache.org/repos/asf/ratis.git
commit f18bce302cfc847ba9f0b83a56a7e2674b50830f Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Thu Jan 19 22:35:24 2023 +0800 RATIS-1772. Refactor the startLeaderElection code in LeaderStateImpl. (#811) (cherry picked from commit 8c16c28351576b893564c5cc621c7d069b1de14a) --- .../apache/ratis/server/impl/LeaderStateImpl.java | 68 ++++++++++------------ 1 file changed, 32 insertions(+), 36 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 43349354c..42ceb8510 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -657,7 +657,7 @@ class LeaderStateImpl implements LeaderState { return pendingStepDown.submitAsync(request); } - private synchronized void sendStartLeaderElectionToHigherPriorityPeer(RaftPeerId follower, TermIndex lastEntry) { + private synchronized void sendStartLeaderElection(RaftPeerId follower, TermIndex lastEntry) { ServerState state = server.getState(); TermIndex currLastEntry = state.getLastEntry(); if (ServerState.compareLog(currLastEntry, lastEntry) != 0) { @@ -665,6 +665,8 @@ class LeaderStateImpl implements LeaderState { "did not match lastEntry:{}", this, follower, currLastEntry, lastEntry); return; } + LOG.info("{}: send StartLeaderElectionRequest to follower {} on term {}, lastEntry={}", + this, follower, currentTerm, lastEntry); final StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto( server.getMemberId(), follower, lastEntry); @@ -681,6 +683,22 @@ class LeaderStateImpl implements LeaderState { }); } + boolean sendStartLeaderElection(FollowerInfo followerInfo) { + final RaftPeerId followerId = followerInfo.getPeer().getId(); + final TermIndex leaderLastEntry = server.getState().getLastEntry(); + if (leaderLastEntry == null) { + sendStartLeaderElection(followerId, null); + return true; + } + + final long followerMatchIndex = followerInfo.getMatchIndex(); + if (followerMatchIndex >= leaderLastEntry.getIndex()) { + sendStartLeaderElection(followerId, leaderLastEntry); + return true; + } + return false; + } + private void prepare() { synchronized (server) { if (isRunning()) { @@ -716,9 +734,8 @@ class LeaderStateImpl implements LeaderState { event.execute(); } else if (inStagingState()) { checkStaging(); - } else { - yieldLeaderToHigherPriorityPeer(); - checkLeadership(); + } else if (checkLeadership()) { + checkPeersForYieldingLeader(); } } } @@ -986,52 +1003,31 @@ class LeaderStateImpl implements LeaderState { return indices; } - private void yieldLeaderToHigherPriorityPeer() { - if (!server.getInfo().isLeader()) { - return; - } - + private void checkPeersForYieldingLeader() { final RaftConfigurationImpl conf = server.getRaftConf(); final RaftPeer leader = conf.getPeer(server.getId()); if (leader == null) { LOG.error("{} the leader {} is not in the conf {}", this, server.getId(), conf); return; } - int leaderPriority = leader.getPriority(); + final int leaderPriority = leader.getPriority(); + FollowerInfo highestPriorityInfo = null; + int highestPriority = Integer.MIN_VALUE; for (LogAppender logAppender : senders.getSenders()) { - final FollowerInfo followerInfo = logAppender.getFollower(); - final RaftPeerId followerID = followerInfo.getPeer().getId(); - final RaftPeer follower = conf.getPeer(followerID); + final RaftPeer follower = conf.getPeer(logAppender.getFollowerId()); if (follower == null) { - if (conf.getPeer(followerID, RaftPeerRole.LISTENER) == null) { - LOG.error("{} the follower {} is not in the conf {}", this, followerID, conf); - } continue; } final int followerPriority = follower.getPriority(); - if (followerPriority <= leaderPriority) { - continue; - } - final TermIndex leaderLastEntry = server.getState().getLastEntry(); - if (leaderLastEntry == null) { - LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " + - "is higher than leader's:{} and leader's lastEntry is null", - this, followerID, currentTerm, followerPriority, leaderPriority); - - sendStartLeaderElectionToHigherPriorityPeer(followerID, null); - return; - } - - if (followerInfo.getMatchIndex() >= leaderLastEntry.getIndex()) { - LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " + - "is higher than leader's:{} and follower's lastEntry index:{} catch up with leader's:{}", - this, followerID, currentTerm, followerPriority, leaderPriority, followerInfo.getMatchIndex(), - leaderLastEntry.getIndex()); - sendStartLeaderElectionToHigherPriorityPeer(followerID, leaderLastEntry); - return; + if (followerPriority > leaderPriority && followerPriority >= highestPriority) { + highestPriority = followerPriority; + highestPriorityInfo = logAppender.getFollower(); } } + if (highestPriorityInfo != null) { + sendStartLeaderElection(highestPriorityInfo); + } } /**
