This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5038b4ca1 RATIS-1762. Support transfer leadership between nodes with 
same priority (#807)
5038b4ca1 is described below

commit 5038b4ca1b7611d96c2eaf824eadab592e714fb5
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Feb 3 04:36:20 2023 +0800

    RATIS-1762. Support transfer leadership between nodes with same priority 
(#807)
---
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  5 +++
 .../ratis/server/impl/RaftConfigurationImpl.java   |  2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  6 ++-
 .../ratis/server/impl/TransferLeadership.java      | 49 ++++++++++++++++++++--
 .../ratis/server/impl/LeaderElectionTests.java     | 44 +++++++++++++++----
 5 files changed, 91 insertions(+), 15 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index a3c0cc28d..af4ba0ada 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -778,6 +778,7 @@ class LeaderStateImpl implements LeaderState {
     } else {
       eventQueue.submit(checkStagingEvent);
     }
+    server.getTransferLeadership().onFollowerAppendEntriesReply(this, 
follower);
   }
 
   @Override
@@ -1239,6 +1240,10 @@ class LeaderStateImpl implements LeaderState {
     return StreamSupport.stream(senders.spliterator(), false);
   }
 
+  Optional<LogAppender> getLogAppender(RaftPeerId id) {
+    return getLogAppenders().filter(a -> 
a.getFollowerId().equals(id)).findAny();
+  }
+
   private static boolean isAttendingVote(FollowerInfo follower) {
     return ((FollowerInfoImpl)follower).isAttendingVote();
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index 3e53451f0..da9481a2e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -172,7 +172,7 @@ final class RaftConfigurationImpl implements 
RaftConfiguration {
     }
     Collection<RaftPeer> peers = getCurrentPeers();
     for (RaftPeer peer : peers) {
-      if (peer.getPriority() >= target.getPriority() && !peer.equals(target)) {
+      if (peer.getPriority() > target.getPriority() && !peer.equals(target)) {
         return false;
       }
     }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fc87f9719..a09434f49 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -317,6 +317,10 @@ class RaftServerImpl implements RaftServer.Division,
     return proxy;
   }
 
+  TransferLeadership getTransferLeadership() {
+    return transferLeadership;
+  }
+
   RaftServerRpc getServerRpc() {
     return proxy.getServerRpc();
   }
@@ -1116,7 +1120,7 @@ class RaftServerImpl implements RaftServer.Division,
         return logAndReturnTransferLeadershipFail(request, msg);
       }
 
-      return transferLeadership.start(request);
+      return transferLeadership.start(leaderState, request);
     }
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
index 3aed1a10d..9fe9081f8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -21,6 +21,8 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.TransferLeadershipRequest;
 import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
+import org.apache.ratis.server.leader.FollowerInfo;
+import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.TimeDuration;
@@ -81,11 +83,47 @@ public class TransferLeadership {
     this.server = server;
   }
 
+  private Optional<RaftPeerId> getTransferee() {
+    return Optional.ofNullable(pending.get())
+        .map(r -> r.getRequest().getNewLeader());
+  }
+
   boolean isSteppingDown() {
     return pending.get() != null;
   }
 
-  CompletableFuture<RaftClientReply> start(TransferLeadershipRequest request) {
+  void onFollowerAppendEntriesReply(LeaderStateImpl leaderState, FollowerInfo 
follower) {
+    final Optional<RaftPeerId> transferee = getTransferee();
+    // If TransferLeadership is in progress, and the transferee has just 
append some entries
+    if (transferee.filter(t -> t.equals(follower.getId())).isPresent()) {
+      // If the transferee is up-to-date, send StartLeaderElection to it
+      if (leaderState.sendStartLeaderElection(follower)) {
+        LOG.info("{}: sent StartLeaderElection to transferee {} after received 
AppendEntriesResponse",
+            server.getMemberId(), transferee.get());
+      }
+    }
+  }
+
+  private void tryTransferLeadership(LeaderStateImpl leaderState, RaftPeerId 
transferee) {
+    LOG.info("{}: start transferring leadership to {}", server.getMemberId(), 
transferee);
+    final LogAppender appender = 
leaderState.getLogAppender(transferee).orElse(null);
+
+    if (appender == null) {
+      LOG.error("{}: cannot find LogAppender for transferee {}", 
server.getMemberId(), transferee);
+      return;
+    }
+    final FollowerInfo follower = appender.getFollower();
+    if (leaderState.sendStartLeaderElection(follower)) {
+      LOG.info("{}: sent StartLeaderElection to transferee {} immediately as 
it already has up-to-date log",
+          server.getMemberId(), transferee);
+    } else {
+      LOG.info("{}: notifying LogAppender to send AppendEntries as transferee 
{} is not up-to-date",
+          server.getMemberId(), transferee);
+      appender.notifyLogAppender();
+    }
+  }
+
+  CompletableFuture<RaftClientReply> start(LeaderStateImpl leaderState, 
TransferLeadershipRequest request) {
     final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() -> 
new PendingRequest(request));
     final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: 
supplier.get());
     if (previous != null) {
@@ -106,10 +144,13 @@ public class TransferLeadership {
         return 
CompletableFuture.completedFuture(server.newExceptionReply(request, tle));
       }
     }
+    tryTransferLeadership(leaderState, request.getNewLeader());
 
-    scheduler.onTimeout(TimeDuration.valueOf(request.getTimeoutMs(), 
TimeUnit.MILLISECONDS),
-        () -> finish(server.getState().getLeaderId(), true),
-        LOG, () -> "Timeout check failed for append entry request: " + 
request);
+    // if timeout is not specified in request, default to random election 
timeout
+    final TimeDuration timeout = request.getTimeoutMs() == 0 ? 
server.getRandomElectionTimeout()
+        : TimeDuration.valueOf(request.getTimeoutMs(), TimeUnit.MILLISECONDS);
+    scheduler.onTimeout(timeout, () -> finish(server.getState().getLeaderId(), 
true),
+        LOG, () -> "Failed to transfer leadership to " + 
request.getNewLeader() + ": timeout after " + timeout);
     return supplier.get().getReplyFuture();
   }
 
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index b41f2bef8..e33c2a512 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -182,7 +182,35 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         client.io().send(new RaftTestUtil.SimpleMessage("message"));
 
         List<RaftServer.Division> followers = cluster.getFollowers();
-        Assert.assertEquals(followers.size(), 2);
+        Assert.assertEquals(2, followers.size());
+        RaftServer.Division newLeader = followers.get(0);
+
+        RaftClientReply reply = 
client.admin().transferLeadership(newLeader.getId(), 20000);
+        Assert.assertTrue(reply.isSuccess());
+
+        final RaftServer.Division currLeader = waitForLeader(cluster);
+        Assert.assertEquals(newLeader.getId(), currLeader.getId());
+
+        reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        Assert.assertEquals(newLeader.getId().toString(), 
reply.getReplierId());
+        Assert.assertTrue(reply.isSuccess());
+      }
+
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testYieldLeaderToHigherPriority() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(3)) {
+      cluster.start();
+
+      final RaftServer.Division leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient(leader.getId())) {
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
+
+        List<RaftServer.Division> followers = cluster.getFollowers();
+        Assert.assertEquals(2, followers.size());
         RaftServer.Division newLeader = followers.get(0);
 
         List<RaftPeer> peers = cluster.getPeers();
@@ -190,14 +218,15 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         RaftClientReply reply = 
client.admin().setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
         Assert.assertTrue(reply.isSuccess());
 
-        reply = client.admin().transferLeadership(newLeader.getId(), 20000);
-        assertTrue(reply.isSuccess());
+        // Wait the old leader to step down.
+        // TODO: make it more deterministic.
+        TimeDuration.valueOf(1, TimeUnit.SECONDS).sleep();
 
         final RaftServer.Division currLeader = waitForLeader(cluster);
-        assertTrue(newLeader.getId() == currLeader.getId());
+        Assert.assertEquals(newLeader.getId(), currLeader.getId());
 
         reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
-        
Assert.assertTrue(reply.getReplierId().equals(newLeader.getId().toString()));
+        Assert.assertEquals(newLeader.getId().toString(), 
reply.getReplierId());
         Assert.assertTrue(reply.isSuccess());
       }
 
@@ -220,9 +249,6 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         isolate(cluster, newLeader.getId());
 
         List<RaftPeer> peers = cluster.getPeers();
-        List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, 
newLeader.getPeer());
-        RaftClientReply reply = 
client.admin().setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
-        Assert.assertTrue(reply.isSuccess());
 
         CompletableFuture<Boolean> transferTimeoutFuture = 
CompletableFuture.supplyAsync(() -> {
           try {
@@ -256,7 +282,7 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         Assert.assertTrue(transferTimeoutFuture.get());
 
         // after transfer timeout, leader should accept request
-        reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        RaftClientReply reply = client.io().send(new 
RaftTestUtil.SimpleMessage("message"));
         
Assert.assertTrue(reply.getReplierId().equals(leader.getId().toString()));
         Assert.assertTrue(reply.isSuccess());
 

Reply via email to