This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f0190f5 RATIS-1487. Implement pause election in server (#577)
f0190f5 is described below
commit f0190f57137a5caeffd28d4a5b265e37c74eac17
Author: Yaolong Liu <[email protected]>
AuthorDate: Tue Jan 18 17:47:06 2022 +0800
RATIS-1487. Implement pause election in server (#577)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 5 ++++
.../org/apache/ratis/server/impl/RoleInfo.java | 9 +++++++
.../ratis/server/impl/LeaderElectionTests.java | 28 ++++++++++++++++++++++
3 files changed, 42 insertions(+)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d7d45b5..21ed539 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1388,6 +1388,11 @@ class RaftServerImpl implements RaftServer.Division,
return reply;
}
+ void setLeaderElectionPause(boolean pause) throws ServerNotReadyException {
+ assertLifeCycleState(LifeCycle.States.RUNNING);
+ role.setLeaderElectionPause(pause);
+ }
+
boolean pause() throws IOException {
// TODO: should pause() be limited on only working for a follower?
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 617b617..d09c801 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -44,6 +45,7 @@ class RoleInfo {
private final AtomicReference<FollowerState> followerState = new
AtomicReference<>();
/** Used when the peer is candidate, to request votes from other peers */
private final AtomicReference<LeaderElection> leaderElection = new
AtomicReference<>();
+ private final AtomicBoolean pauseLeaderElection = new AtomicBoolean(false);
private final AtomicReference<Timestamp> transitionTime;
@@ -112,9 +114,16 @@ class RoleInfo {
}
void startLeaderElection(RaftServerImpl server, boolean force) {
+ if (pauseLeaderElection.get()) {
+ return;
+ }
updateAndGet(leaderElection, new LeaderElection(server, force)).start();
}
+ void setLeaderElectionPause(boolean pause) {
+ pauseLeaderElection.set(pause);
+ }
+
void shutdownLeaderElection() {
final LeaderElection election = leaderElection.getAndSet(null);
if (election != null) {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 20e3127..4d95449 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -35,6 +35,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
+import org.apache.ratis.thirdparty.org.checkerframework.checker.units.qual.A;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
@@ -408,6 +409,33 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
}
}
+ @Test
+ public void testPauseResumeLeaderElection() throws Exception {
+ runWithNewCluster(3, this::runTestPauseResumeLeaderElection);
+ }
+
+ void runTestPauseResumeLeaderElection(CLUSTER cluster) throws IOException,
InterruptedException {
+ RaftServer.Division leader = waitForLeader(cluster);
+ RaftPeerId leaderId = leader.getId();
+ final List<RaftServer.Division> followers = cluster.getFollowers();
+ Assert.assertTrue(followers.size() >= 1);
+ final RaftServerImpl f1 = (RaftServerImpl)followers.get(0);
+ f1.setLeaderElectionPause(true);
+ try (RaftClient client = cluster.createClient(leader.getId())) {
+ client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ RaftServer.Division newLeader = followers.get(0);
+ List<RaftPeer> peers = cluster.getPeers();
+ List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers,
newLeader.getPeer());
+ RaftClientReply reply =
client.admin().setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
+ Assert.assertTrue(reply.isSuccess());
+ JavaUtils.attempt(() -> Assert.assertEquals(leaderId, leader.getId()),
+ 20, HUNDRED_MILLIS, "check leader id", LOG);
+ f1.setLeaderElectionPause(false);
+ JavaUtils.attempt(() -> Assert.assertEquals(f1.getId(),
cluster.getLeader().getId()),
+ 20, HUNDRED_MILLIS, "check new leader", LOG);
+ }
+ }
+
private static RaftServerImpl createMockServer(boolean alive) {
final DivisionInfo info = mock(DivisionInfo.class);
when(info.isAlive()).thenReturn(alive);