This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 052e109 RATIS-1034. Change leader heartbeat to yield to a higher
priority leader (#184)
052e109 is described below
commit 052e109bf33bedaf6754c6a26f55c2eb1d9dbae9
Author: runzhiwang <[email protected]>
AuthorDate: Mon Aug 31 12:52:19 2020 +0800
RATIS-1034. Change leader heartbeat to yield to a higher priority leader
(#184)
---
.../apache/ratis/server/impl/LeaderElection.java | 10 +---
.../org/apache/ratis/server/impl/LeaderState.java | 59 ++++++++++++++++++++++
.../apache/ratis/server/impl/RaftServerImpl.java | 2 +-
.../org/apache/ratis/server/impl/ServerState.java | 26 ++++++----
.../ratis/server/impl/GroupManagementBaseTest.java | 16 ++++++
5 files changed, 94 insertions(+), 19 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index a85c1a5..27e5a28 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -22,7 +22,6 @@ import
org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.statemachine.SnapshotInfo;
import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.LifeCycle;
@@ -210,14 +209,7 @@ class LeaderElection implements Runnable {
}
LOG.info("{}: begin an election at term {} for {}", this, electionTerm,
conf);
- TermIndex lastEntry = state.getLog().getLastEntryTermIndex();
- if (lastEntry == null) {
- // lastEntry may need to be derived from snapshot
- SnapshotInfo snapshot = state.getLatestSnapshot();
- if (snapshot != null) {
- lastEntry = snapshot.getTermIndex();
- }
- }
+ TermIndex lastEntry = state.getLastEntry();
final ResultAndTerm r;
final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 14ad87a..11631e5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -485,6 +485,18 @@ public class LeaderState {
}
}
+ private synchronized void stepDown(long term, TermIndex lastEntry) {
+ ServerState state = server.getState();
+ TermIndex currLastEntry = state.getLastEntry();
+ if (ServerState.compareLog(currLastEntry, lastEntry) != 0) {
+ LOG.warn("{} can not stepDown because currLastEntry:{} did not match
lastEntry:{}",
+ this, currLastEntry, lastEntry);
+ return;
+ }
+
+ stepDown(term);
+ }
+
private void prepare() {
synchronized (server) {
if (running) {
@@ -517,6 +529,7 @@ public class LeaderState {
} else if (inStagingState()) {
checkStaging();
} else {
+ yieldLeaderToHigherPriorityPeer();
checkLeadership();
}
}
@@ -794,12 +807,58 @@ public class LeaderState {
return lists;
}
+ private void yieldLeaderToHigherPriorityPeer() {
+ if (!server.getRole().isLeader()) {
+ return;
+ }
+
+ final RaftConfiguration conf = server.getRaftConf();
+ int leaderPriority = conf.getPeer(server.getId()).getPriority();
+
+ TermIndex leaderLastEntry = server.getState().getLastEntry();
+
+ for (LogAppender logAppender : senders.getSenders()) {
+ FollowerInfo followerInfo = logAppender.getFollower();
+ RaftPeerId followerID = followerInfo.getPeer().getId();
+ int followerPriority = conf.getPeer(followerID).getPriority();
+
+ if (followerPriority <= leaderPriority) {
+ continue;
+ }
+
+ if (leaderLastEntry == null) {
+ LOG.info("{} stepDown leadership on term:{} because follower's
priority:{} is higher than leader's:{} " +
+ "and leader's lastEntry is null",
+ this, currentTerm, followerPriority, leaderPriority);
+
+ // step down as follower
+ stepDown(currentTerm, server.getState().getLastEntry());
+ return;
+ }
+
+ if (followerInfo.getMatchIndex() >= leaderLastEntry.getIndex()) {
+ LOG.info("{} stepDown leadership on term:{} because follower's
priority:{} is higher than leader's:{} " +
+ "and follower's lastEntry index:{} catch up with leader's:{}",
+ this, currentTerm, followerPriority, leaderPriority,
followerInfo.getMatchIndex(),
+ leaderLastEntry.getIndex());
+
+ // step down as follower
+ stepDown(currentTerm, server.getState().getLastEntry());
+ return;
+ }
+ }
+ }
+
/**
* See the thesis section 6.2: A leader in Raft steps down
* if an election timeout elapses without a successful
* round of heartbeats to a majority of its cluster.
*/
private void checkLeadership() {
+ if (!server.getRole().isLeader()) {
+ return;
+ }
+
// The initial value of lastRpcResponseTime in FollowerInfo is set by
// LeaderState::addSenders(), which is fake and used to trigger an
// immediate round of AppendEntries request. Since candidates collect
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f72dde1..79436d0 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -880,7 +880,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
// see Section 5.4.1 Election restriction
RaftPeer candidate = getRaftConf().getPeer(candidateId);
if (fs != null && candidate != null) {
- int compare = state.compareLog(candidateLastEntry);
+ int compare = ServerState.compareLog(state.getLastEntry(),
candidateLastEntry);
int priority = getRaftConf().getPeer(getId()).getPriority();
LOG.info("{} priority:{} candidate:{} candidatePriority:{}
compare:{}",
this, priority, candidate, candidate.getPriority(), compare);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 1213d14..3011fbe 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -278,6 +278,19 @@ public class ServerState implements Closeable {
return log;
}
+ public TermIndex getLastEntry() {
+ TermIndex lastEntry = getLog().getLastEntryTermIndex();
+ if (lastEntry == null) {
+ // lastEntry may need to be derived from snapshot
+ SnapshotInfo snapshot = getLatestSnapshot();
+ if (snapshot != null) {
+ lastEntry = snapshot.getTermIndex();
+ }
+ }
+
+ return lastEntry;
+ }
+
void appendLog(TransactionContext operation) throws StateMachineException {
log.append(currentTerm.get(), operation);
Objects.requireNonNull(operation.getLogEntry());
@@ -318,11 +331,8 @@ public class ServerState implements Closeable {
return false;
}
- int compareLog(TermIndex candidateLastEntry) {
- TermIndex local = log.getLastEntryTermIndex();
- // need to take into account snapshot
- SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
- if (local == null && snapshot == null) {
+ static int compareLog(TermIndex lastEntry, TermIndex candidateLastEntry) {
+ if (lastEntry == null) {
// If the lastEntry of candidate is null, the proto will transfer an
empty TermIndexProto,
// then term and index of candidateLastEntry will both be 0.
// Besides, candidateLastEntry comes from proto now, it never be null.
@@ -336,10 +346,8 @@ public class ServerState implements Closeable {
} else if (candidateLastEntry == null) {
return 1;
}
- if (local == null || (snapshot != null && snapshot.getIndex() >
local.getIndex())) {
- local = snapshot.getTermIndex();
- }
- return local.compareTo(candidateLastEntry);
+
+ return lastEntry.compareTo(candidateLastEntry);
}
@Override
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 903f903..fa9aca7 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -136,10 +136,26 @@ public abstract class GroupManagementBaseTest extends
BaseTest {
Assert.assertTrue(leader.getId() !=
peers.get(suggestedLeaderIndex).getId());
}, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS),
"testMultiGroupWithPriority", LOG);
+ // send request so that suggested leader's log lag behind new leader's,
+ // when suggested leader rejoin cluster, it will catch up log first.
+ try (final RaftClient client = cluster.createClient(newGroup)) {
+ for (int i = 0; i < 10; i ++) {
+ RaftClientReply reply = client.send(new RaftTestUtil.SimpleMessage("m"
+ i));
+ Assert.assertTrue(reply.isSuccess());
+ }
+ }
+
BlockRequestHandlingInjection.getInstance().unblockRequestor(suggestedLeader);
BlockRequestHandlingInjection.getInstance().unblockReplier(suggestedLeader);
cluster.setBlockRequestsFrom(suggestedLeader, false);
+ // suggested leader with highest priority rejoin cluster, then current
leader will yield
+ // leadership to suggested leader when suggested leader catch up the log.
+ JavaUtils.attempt(() -> {
+ RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster,
newGroup.getGroupId());
+ Assert.assertTrue(leader.getId() ==
peers.get(suggestedLeaderIndex).getId());
+ }, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS),
"testMultiGroupWithPriority", LOG);
+
cluster.shutdown();
}