This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 5038b4ca1 RATIS-1762. Support transfer leadership between nodes with
same priority (#807)
5038b4ca1 is described below
commit 5038b4ca1b7611d96c2eaf824eadab592e714fb5
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Feb 3 04:36:20 2023 +0800
RATIS-1762. Support transfer leadership between nodes with same priority
(#807)
---
.../apache/ratis/server/impl/LeaderStateImpl.java | 5 +++
.../ratis/server/impl/RaftConfigurationImpl.java | 2 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 6 ++-
.../ratis/server/impl/TransferLeadership.java | 49 ++++++++++++++++++++--
.../ratis/server/impl/LeaderElectionTests.java | 44 +++++++++++++++----
5 files changed, 91 insertions(+), 15 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index a3c0cc28d..af4ba0ada 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -778,6 +778,7 @@ class LeaderStateImpl implements LeaderState {
} else {
eventQueue.submit(checkStagingEvent);
}
+ server.getTransferLeadership().onFollowerAppendEntriesReply(this,
follower);
}
@Override
@@ -1239,6 +1240,10 @@ class LeaderStateImpl implements LeaderState {
return StreamSupport.stream(senders.spliterator(), false);
}
+ Optional<LogAppender> getLogAppender(RaftPeerId id) {
+ return getLogAppenders().filter(a ->
a.getFollowerId().equals(id)).findAny();
+ }
+
private static boolean isAttendingVote(FollowerInfo follower) {
return ((FollowerInfoImpl)follower).isAttendingVote();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index 3e53451f0..da9481a2e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -172,7 +172,7 @@ final class RaftConfigurationImpl implements
RaftConfiguration {
}
Collection<RaftPeer> peers = getCurrentPeers();
for (RaftPeer peer : peers) {
- if (peer.getPriority() >= target.getPriority() && !peer.equals(target)) {
+ if (peer.getPriority() > target.getPriority() && !peer.equals(target)) {
return false;
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fc87f9719..a09434f49 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -317,6 +317,10 @@ class RaftServerImpl implements RaftServer.Division,
return proxy;
}
+ TransferLeadership getTransferLeadership() {
+ return transferLeadership;
+ }
+
RaftServerRpc getServerRpc() {
return proxy.getServerRpc();
}
@@ -1116,7 +1120,7 @@ class RaftServerImpl implements RaftServer.Division,
return logAndReturnTransferLeadershipFail(request, msg);
}
- return transferLeadership.start(request);
+ return transferLeadership.start(leaderState, request);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
index 3aed1a10d..9fe9081f8 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -21,6 +21,8 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
+import org.apache.ratis.server.leader.FollowerInfo;
+import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.TimeDuration;
@@ -81,11 +83,47 @@ public class TransferLeadership {
this.server = server;
}
+ private Optional<RaftPeerId> getTransferee() {
+ return Optional.ofNullable(pending.get())
+ .map(r -> r.getRequest().getNewLeader());
+ }
+
boolean isSteppingDown() {
return pending.get() != null;
}
- CompletableFuture<RaftClientReply> start(TransferLeadershipRequest request) {
+ void onFollowerAppendEntriesReply(LeaderStateImpl leaderState, FollowerInfo
follower) {
+ final Optional<RaftPeerId> transferee = getTransferee();
+ // If TransferLeadership is in progress, and the transferee has just
append some entries
+ if (transferee.filter(t -> t.equals(follower.getId())).isPresent()) {
+ // If the transferee is up-to-date, send StartLeaderElection to it
+ if (leaderState.sendStartLeaderElection(follower)) {
+ LOG.info("{}: sent StartLeaderElection to transferee {} after received
AppendEntriesResponse",
+ server.getMemberId(), transferee.get());
+ }
+ }
+ }
+
+ private void tryTransferLeadership(LeaderStateImpl leaderState, RaftPeerId
transferee) {
+ LOG.info("{}: start transferring leadership to {}", server.getMemberId(),
transferee);
+ final LogAppender appender =
leaderState.getLogAppender(transferee).orElse(null);
+
+ if (appender == null) {
+ LOG.error("{}: cannot find LogAppender for transferee {}",
server.getMemberId(), transferee);
+ return;
+ }
+ final FollowerInfo follower = appender.getFollower();
+ if (leaderState.sendStartLeaderElection(follower)) {
+ LOG.info("{}: sent StartLeaderElection to transferee {} immediately as
it already has up-to-date log",
+ server.getMemberId(), transferee);
+ } else {
+ LOG.info("{}: notifying LogAppender to send AppendEntries as transferee
{} is not up-to-date",
+ server.getMemberId(), transferee);
+ appender.notifyLogAppender();
+ }
+ }
+
+ CompletableFuture<RaftClientReply> start(LeaderStateImpl leaderState,
TransferLeadershipRequest request) {
final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() ->
new PendingRequest(request));
final PendingRequest previous = pending.getAndUpdate(f -> f != null? f:
supplier.get());
if (previous != null) {
@@ -106,10 +144,13 @@ public class TransferLeadership {
return
CompletableFuture.completedFuture(server.newExceptionReply(request, tle));
}
}
+ tryTransferLeadership(leaderState, request.getNewLeader());
- scheduler.onTimeout(TimeDuration.valueOf(request.getTimeoutMs(),
TimeUnit.MILLISECONDS),
- () -> finish(server.getState().getLeaderId(), true),
- LOG, () -> "Timeout check failed for append entry request: " +
request);
+ // if timeout is not specified in request, default to random election
timeout
+ final TimeDuration timeout = request.getTimeoutMs() == 0 ?
server.getRandomElectionTimeout()
+ : TimeDuration.valueOf(request.getTimeoutMs(), TimeUnit.MILLISECONDS);
+ scheduler.onTimeout(timeout, () -> finish(server.getState().getLeaderId(),
true),
+ LOG, () -> "Failed to transfer leadership to " +
request.getNewLeader() + ": timeout after " + timeout);
return supplier.get().getReplyFuture();
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index b41f2bef8..e33c2a512 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -182,7 +182,35 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
client.io().send(new RaftTestUtil.SimpleMessage("message"));
List<RaftServer.Division> followers = cluster.getFollowers();
- Assert.assertEquals(followers.size(), 2);
+ Assert.assertEquals(2, followers.size());
+ RaftServer.Division newLeader = followers.get(0);
+
+ RaftClientReply reply =
client.admin().transferLeadership(newLeader.getId(), 20000);
+ Assert.assertTrue(reply.isSuccess());
+
+ final RaftServer.Division currLeader = waitForLeader(cluster);
+ Assert.assertEquals(newLeader.getId(), currLeader.getId());
+
+ reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ Assert.assertEquals(newLeader.getId().toString(),
reply.getReplierId());
+ Assert.assertTrue(reply.isSuccess());
+ }
+
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testYieldLeaderToHigherPriority() throws Exception {
+ try(final MiniRaftCluster cluster = newCluster(3)) {
+ cluster.start();
+
+ final RaftServer.Division leader = waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient(leader.getId())) {
+ client.io().send(new RaftTestUtil.SimpleMessage("message"));
+
+ List<RaftServer.Division> followers = cluster.getFollowers();
+ Assert.assertEquals(2, followers.size());
RaftServer.Division newLeader = followers.get(0);
List<RaftPeer> peers = cluster.getPeers();
@@ -190,14 +218,15 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
RaftClientReply reply =
client.admin().setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
Assert.assertTrue(reply.isSuccess());
- reply = client.admin().transferLeadership(newLeader.getId(), 20000);
- assertTrue(reply.isSuccess());
+ // Wait the old leader to step down.
+ // TODO: make it more deterministic.
+ TimeDuration.valueOf(1, TimeUnit.SECONDS).sleep();
final RaftServer.Division currLeader = waitForLeader(cluster);
- assertTrue(newLeader.getId() == currLeader.getId());
+ Assert.assertEquals(newLeader.getId(), currLeader.getId());
reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
-
Assert.assertTrue(reply.getReplierId().equals(newLeader.getId().toString()));
+ Assert.assertEquals(newLeader.getId().toString(),
reply.getReplierId());
Assert.assertTrue(reply.isSuccess());
}
@@ -220,9 +249,6 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
isolate(cluster, newLeader.getId());
List<RaftPeer> peers = cluster.getPeers();
- List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers,
newLeader.getPeer());
- RaftClientReply reply =
client.admin().setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
- Assert.assertTrue(reply.isSuccess());
CompletableFuture<Boolean> transferTimeoutFuture =
CompletableFuture.supplyAsync(() -> {
try {
@@ -256,7 +282,7 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
Assert.assertTrue(transferTimeoutFuture.get());
// after transfer timeout, leader should accept request
- reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ RaftClientReply reply = client.io().send(new
RaftTestUtil.SimpleMessage("message"));
Assert.assertTrue(reply.getReplierId().equals(leader.getId().toString()));
Assert.assertTrue(reply.isSuccess());