This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch snapshot-3
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/snapshot-3 by this push:
     new b09bfe771 RATIS-2172. RaftServer may lose FollowerState (#1166)
b09bfe771 is described below

commit b09bfe771c24e75221cd9625624fdb991b14e9fa
Author: 133tosakarin <[email protected]>
AuthorDate: Sat Oct 12 05:55:35 2024 +0800

    RATIS-2172. RaftServer may lose FollowerState (#1166)
---
 .../apache/ratis/util/CodeInjectionForTesting.java |  5 ++
 .../apache/ratis/server/impl/FollowerState.java    | 13 ++--
 .../apache/ratis/server/impl/LeaderElection.java   |  4 --
 .../apache/ratis/server/impl/RaftServerImpl.java   |  6 +-
 .../ratis/server/impl/LeaderElectionTests.java     | 77 ++++++++++++++++++++++
 5 files changed, 96 insertions(+), 9 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java 
b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
index a7d36ac0e..112f6bd25 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
@@ -68,4 +68,9 @@ public final class CodeInjectionForTesting {
     }
     return code.execute(localId, remoteId, args);
   }
+
+  /** Remove an injection point. */
+  public static void remove(String injectionPoint) {
+    INJECTION_POINTS.remove(injectionPoint);
+  }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index fa61e9088..1be160f18 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -133,6 +133,14 @@ class FollowerState extends Daemon {
     }
   }
 
+  private boolean roleChangeChecking(TimeDuration electionTimeout) {
+    return outstandingOp.get() == 0
+            && isRunning && server.getInfo().isFollower()
+            && lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0
+            && !lostMajorityHeartbeatsRecently()
+            && server.isRunning();
+  }
+
   private void runImpl() {
     final TimeDuration sleepDeviationThreshold = 
server.getSleepDeviationThreshold();
     while (shouldRun()) {
@@ -149,10 +157,7 @@ class FollowerState extends Daemon {
           break;
         }
         synchronized (server) {
-          if (outstandingOp.get() == 0
-              && isRunning && server.getInfo().isFollower()
-              && lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0
-              && !lostMajorityHeartbeatsRecently()) {
+          if (roleChangeChecking(electionTimeout)) {
             LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, 
electionTimeout:{}",
                 this, lastRpcTime.elapsedTime(), electionTimeout);
             server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // 
Update timeout metric counters.
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index a5bfba7be..4badd09cd 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -253,10 +253,6 @@ class LeaderElection implements Runnable {
     }
 
     try (AutoCloseable ignored = 
Timekeeper.start(server.getLeaderElectionMetrics().getLeaderElectionTimer())) {
-      if (!server.isRunning()) {
-        LOG.info("{}: skip since the server is not running", this);
-        return;
-      }
       for (int round = 0; shouldRun(); round++) {
         if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) {
           if (askForVotes(Phase.ELECTION, round)) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index ae158ad75..1ac62fd98 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -160,6 +160,7 @@ class RaftServerImpl implements RaftServer.Division,
   static final String APPEND_TRANSACTION = CLASS_NAME + ".appendTransaction";
   static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete";
   static final String START_LEADER_ELECTION = CLASS_NAME + 
".startLeaderElection";
+  static final String START_COMPLETE = CLASS_NAME + ".startComplete";
 
   class Info implements DivisionInfo {
     @Override
@@ -400,7 +401,10 @@ class RaftServerImpl implements RaftServer.Division,
 
     jmxAdapter.registerMBean();
     state.start();
-    startComplete.compareAndSet(false, true);
+    CodeInjectionForTesting.execute(START_COMPLETE, getId(), null, role);
+    if (startComplete.compareAndSet(false, true)) {
+      LOG.info("{}: Successfully started.", getMemberId());
+    }
     return true;
   }
 
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index b29b537ab..b175ffe29 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
 import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.server.DivisionInfo;
@@ -44,6 +45,8 @@ import org.apache.ratis.util.Slf4jUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
 import org.apache.ratis.util.function.CheckedBiConsumer;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
@@ -97,6 +100,80 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
     cluster.shutdown();
   }
 
+  static class SleepCode implements CodeInjectionForTesting.Code {
+    private final long sleepMs;
+
+    SleepCode(long sleepMs) {
+      this.sleepMs = sleepMs;
+    }
+
+    @Override
+    public boolean execute(Object localId, Object remoteId, Object... args) {
+      try {
+        LOG.info("{}: Simulate RaftServer startup blocking", localId);
+        Thread.sleep(sleepMs);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return true;
+    }
+  }
+
+  @Test
+  public void testWaitServerReady() throws Exception {
+    final int sleepMs = 1000 + ThreadLocalRandom.current().nextInt(1000);
+    LOG.info("Running testWaitServerReady, sleep = {}ms", sleepMs);
+    CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new 
SleepCode(sleepMs));
+    final MiniRaftCluster cluster = newCluster(1);
+    final Timestamp startTime = Timestamp.currentTime();
+    cluster.start();
+    LOG.info("Cluster started at {}ms", startTime.elapsedTimeMs());
+    final RaftGroupId groupId = cluster.getGroupId();
+    final RaftServerImpl server = (RaftServerImpl) 
cluster.getServers().iterator().next().getDivision(groupId);
+    final boolean isRunning = server.isRunning();
+    LOG.info("{} isRunning at {}ms? {}", server.getId(), 
startTime.elapsedTimeMs(), isRunning);
+
+    // Leader will be elected if the server is ready
+    Assertions.assertNotNull(waitForLeader(cluster), "No leader is elected.");
+    final long elapsedMs = startTime.elapsedTimeMs();
+    // allow a small difference to tolerate system timer inaccuracy
+    Assertions.assertTrue(elapsedMs > sleepMs - 10, () -> "elapseMs = " + 
elapsedMs + " but sleepMs = " + sleepMs);
+    cluster.shutdown();
+    CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE);
+  }
+
+  @Test
+  public void testAddServerForWaitReady() throws IOException, 
InterruptedException {
+    LOG.info("Running testAddServerForWaitReady");
+    // normal startup cluster with 3 server
+    final MiniRaftCluster cluster = newCluster(3);
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+    try (RaftClient client = cluster.createClient()) {
+      for (int i = 0; i < 10; ++i) {
+        RaftClientReply reply = client.io().send(new 
RaftTestUtil.SimpleMessage("message_" + i));
+        Assertions.assertTrue(reply.isSuccess());
+      }
+      // add 3 new servers and wait longer time
+      CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new 
SleepCode(2000));
+      MiniRaftCluster.PeerChanges peerChanges = cluster.addNewPeers(2, true, 
false);
+      LOG.info("add new 3 servers");
+      LOG.info(cluster.printServers());
+      RaftClientReply reply = 
client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder()
+              .setServersInNewConf(peerChanges.newPeers)
+              .setMode(SetConfigurationRequest.Mode.ADD).build());
+      Assert.assertTrue(reply.isSuccess());
+      for (RaftServer server : cluster.getServers()) {
+        RaftServerProxy proxy = (RaftServerProxy) server;
+        proxy.getImpls().forEach(s -> {
+          Assertions.assertTrue(s.isRunning());
+        });
+      }
+    }
+    cluster.shutdown();;
+    CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE);
+  }
+
   @Test
   public void testChangeLeader() throws Exception {
     SegmentedRaftLogTestUtils.setRaftLogWorkerLogLevel(Level.TRACE);

Reply via email to