This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch snapshot-3
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/snapshot-3 by this push:
new b09bfe771 RATIS-2172. RaftServer may lose FollowerState (#1166)
b09bfe771 is described below
commit b09bfe771c24e75221cd9625624fdb991b14e9fa
Author: 133tosakarin <[email protected]>
AuthorDate: Sat Oct 12 05:55:35 2024 +0800
RATIS-2172. RaftServer may lose FollowerState (#1166)
---
.../apache/ratis/util/CodeInjectionForTesting.java | 5 ++
.../apache/ratis/server/impl/FollowerState.java | 13 ++--
.../apache/ratis/server/impl/LeaderElection.java | 4 --
.../apache/ratis/server/impl/RaftServerImpl.java | 6 +-
.../ratis/server/impl/LeaderElectionTests.java | 77 ++++++++++++++++++++++
5 files changed, 96 insertions(+), 9 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
index a7d36ac0e..112f6bd25 100644
---
a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
+++
b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
@@ -68,4 +68,9 @@ public final class CodeInjectionForTesting {
}
return code.execute(localId, remoteId, args);
}
+
+ /** Remove an injection point. */
+ public static void remove(String injectionPoint) {
+ INJECTION_POINTS.remove(injectionPoint);
+ }
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index fa61e9088..1be160f18 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -133,6 +133,14 @@ class FollowerState extends Daemon {
}
}
+ private boolean roleChangeChecking(TimeDuration electionTimeout) {
+ return outstandingOp.get() == 0
+ && isRunning && server.getInfo().isFollower()
+ && lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0
+ && !lostMajorityHeartbeatsRecently()
+ && server.isRunning();
+ }
+
private void runImpl() {
final TimeDuration sleepDeviationThreshold =
server.getSleepDeviationThreshold();
while (shouldRun()) {
@@ -149,10 +157,7 @@ class FollowerState extends Daemon {
break;
}
synchronized (server) {
- if (outstandingOp.get() == 0
- && isRunning && server.getInfo().isFollower()
- && lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0
- && !lostMajorityHeartbeatsRecently()) {
+ if (roleChangeChecking(electionTimeout)) {
LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{},
electionTimeout:{}",
this, lastRpcTime.elapsedTime(), electionTimeout);
server.getLeaderElectionMetrics().onLeaderElectionTimeout(); //
Update timeout metric counters.
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index a5bfba7be..4badd09cd 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -253,10 +253,6 @@ class LeaderElection implements Runnable {
}
try (AutoCloseable ignored =
Timekeeper.start(server.getLeaderElectionMetrics().getLeaderElectionTimer())) {
- if (!server.isRunning()) {
- LOG.info("{}: skip since the server is not running", this);
- return;
- }
for (int round = 0; shouldRun(); round++) {
if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) {
if (askForVotes(Phase.ELECTION, round)) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index ae158ad75..1ac62fd98 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -160,6 +160,7 @@ class RaftServerImpl implements RaftServer.Division,
static final String APPEND_TRANSACTION = CLASS_NAME + ".appendTransaction";
static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete";
static final String START_LEADER_ELECTION = CLASS_NAME +
".startLeaderElection";
+ static final String START_COMPLETE = CLASS_NAME + ".startComplete";
class Info implements DivisionInfo {
@Override
@@ -400,7 +401,10 @@ class RaftServerImpl implements RaftServer.Division,
jmxAdapter.registerMBean();
state.start();
- startComplete.compareAndSet(false, true);
+ CodeInjectionForTesting.execute(START_COMPLETE, getId(), null, role);
+ if (startComplete.compareAndSet(false, true)) {
+ LOG.info("{}: Successfully started.", getMemberId());
+ }
return true;
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index b29b537ab..b175ffe29 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.server.DivisionInfo;
@@ -44,6 +45,8 @@ import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import org.apache.ratis.util.function.CheckedBiConsumer;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -97,6 +100,80 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
cluster.shutdown();
}
+ static class SleepCode implements CodeInjectionForTesting.Code {
+ private final long sleepMs;
+
+ SleepCode(long sleepMs) {
+ this.sleepMs = sleepMs;
+ }
+
+ @Override
+ public boolean execute(Object localId, Object remoteId, Object... args) {
+ try {
+ LOG.info("{}: Simulate RaftServer startup blocking", localId);
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return true;
+ }
+ }
+
+ @Test
+ public void testWaitServerReady() throws Exception {
+ final int sleepMs = 1000 + ThreadLocalRandom.current().nextInt(1000);
+ LOG.info("Running testWaitServerReady, sleep = {}ms", sleepMs);
+ CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new
SleepCode(sleepMs));
+ final MiniRaftCluster cluster = newCluster(1);
+ final Timestamp startTime = Timestamp.currentTime();
+ cluster.start();
+ LOG.info("Cluster started at {}ms", startTime.elapsedTimeMs());
+ final RaftGroupId groupId = cluster.getGroupId();
+ final RaftServerImpl server = (RaftServerImpl)
cluster.getServers().iterator().next().getDivision(groupId);
+ final boolean isRunning = server.isRunning();
+ LOG.info("{} isRunning at {}ms? {}", server.getId(),
startTime.elapsedTimeMs(), isRunning);
+
+ // Leader will be elected if the server is ready
+ Assertions.assertNotNull(waitForLeader(cluster), "No leader is elected.");
+ final long elapsedMs = startTime.elapsedTimeMs();
+ // allow a small difference to tolerate system timer inaccuracy
+ Assertions.assertTrue(elapsedMs > sleepMs - 10, () -> "elapseMs = " +
elapsedMs + " but sleepMs = " + sleepMs);
+ cluster.shutdown();
+ CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE);
+ }
+
+ @Test
+ public void testAddServerForWaitReady() throws IOException,
InterruptedException {
+ LOG.info("Running testAddServerForWaitReady");
+ // normal startup cluster with 3 server
+ final MiniRaftCluster cluster = newCluster(3);
+ cluster.start();
+ RaftTestUtil.waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient()) {
+ for (int i = 0; i < 10; ++i) {
+ RaftClientReply reply = client.io().send(new
RaftTestUtil.SimpleMessage("message_" + i));
+ Assertions.assertTrue(reply.isSuccess());
+ }
+ // add 3 new servers and wait longer time
+ CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new
SleepCode(2000));
+ MiniRaftCluster.PeerChanges peerChanges = cluster.addNewPeers(2, true,
false);
+ LOG.info("add new 3 servers");
+ LOG.info(cluster.printServers());
+ RaftClientReply reply =
client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder()
+ .setServersInNewConf(peerChanges.newPeers)
+ .setMode(SetConfigurationRequest.Mode.ADD).build());
+ Assert.assertTrue(reply.isSuccess());
+ for (RaftServer server : cluster.getServers()) {
+ RaftServerProxy proxy = (RaftServerProxy) server;
+ proxy.getImpls().forEach(s -> {
+ Assertions.assertTrue(s.isRunning());
+ });
+ }
+ }
+ cluster.shutdown();;
+ CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE);
+ }
+
@Test
public void testChangeLeader() throws Exception {
SegmentedRaftLogTestUtils.setRaftLogWorkerLogLevel(Level.TRACE);