This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f0190f5  RATIS-1487. Implement pause election in server (#577)
f0190f5 is described below

commit f0190f57137a5caeffd28d4a5b265e37c74eac17
Author: Yaolong Liu <[email protected]>
AuthorDate: Tue Jan 18 17:47:06 2022 +0800

    RATIS-1487. Implement pause election in server (#577)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   |  5 ++++
 .../org/apache/ratis/server/impl/RoleInfo.java     |  9 +++++++
 .../ratis/server/impl/LeaderElectionTests.java     | 28 ++++++++++++++++++++++
 3 files changed, 42 insertions(+)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d7d45b5..21ed539 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1388,6 +1388,11 @@ class RaftServerImpl implements RaftServer.Division,
     return reply;
   }
 
+  void setLeaderElectionPause(boolean pause) throws ServerNotReadyException {
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    role.setLeaderElectionPause(pause);
+  }
+
   boolean pause() throws IOException {
     // TODO: should pause() be limited on only working for a follower?
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 617b617..d09c801 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -44,6 +45,7 @@ class RoleInfo {
   private final AtomicReference<FollowerState> followerState = new 
AtomicReference<>();
   /** Used when the peer is candidate, to request votes from other peers */
   private final AtomicReference<LeaderElection> leaderElection = new 
AtomicReference<>();
+  private final AtomicBoolean pauseLeaderElection = new AtomicBoolean(false);
 
   private final AtomicReference<Timestamp> transitionTime;
 
@@ -112,9 +114,16 @@ class RoleInfo {
   }
 
   void startLeaderElection(RaftServerImpl server, boolean force) {
+    if (pauseLeaderElection.get()) {
+      return;
+    }
     updateAndGet(leaderElection, new LeaderElection(server, force)).start();
   }
 
+  void setLeaderElectionPause(boolean pause) {
+    pauseLeaderElection.set(pause);
+  }
+
   void shutdownLeaderElection() {
     final LeaderElection election = leaderElection.getAndSet(null);
     if (election != null) {
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 20e3127..4d95449 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -35,6 +35,7 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.metrics.LeaderElectionMetrics;
 import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
+import org.apache.ratis.thirdparty.org.checkerframework.checker.units.qual.A;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
@@ -408,6 +409,33 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
     }
   }
 
+  @Test
+  public void testPauseResumeLeaderElection() throws Exception {
+    runWithNewCluster(3, this::runTestPauseResumeLeaderElection);
+  }
+
+  void runTestPauseResumeLeaderElection(CLUSTER cluster) throws IOException, 
InterruptedException {
+    RaftServer.Division leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    final List<RaftServer.Division> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    final RaftServerImpl f1 = (RaftServerImpl)followers.get(0);
+    f1.setLeaderElectionPause(true);
+    try (RaftClient client = cluster.createClient(leader.getId())) {
+      client.io().send(new RaftTestUtil.SimpleMessage("message"));
+      RaftServer.Division newLeader = followers.get(0);
+      List<RaftPeer> peers = cluster.getPeers();
+      List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, 
newLeader.getPeer());
+      RaftClientReply reply = 
client.admin().setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
+      Assert.assertTrue(reply.isSuccess());
+      JavaUtils.attempt(() -> Assert.assertEquals(leaderId, leader.getId()),
+          20, HUNDRED_MILLIS, "check leader id", LOG);
+      f1.setLeaderElectionPause(false);
+      JavaUtils.attempt(() -> Assert.assertEquals(f1.getId(), 
cluster.getLeader().getId()),
+          20, HUNDRED_MILLIS, "check new leader", LOG);
+    }
+  }
+
   private static RaftServerImpl createMockServer(boolean alive) {
     final DivisionInfo info = mock(DivisionInfo.class);
     when(info.isAlive()).thenReturn(alive);

Reply via email to