szetszwo commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r494480462



##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, 
cluster, LOG));
   }
 
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, 
IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    // keep sending messages to the leader.
+    SimpleMessage[] messages = SimpleMessage.create(100);
+    Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, 
leaderId, messages);
+    // pause follower.
+    boolean isSuccess = follower.pause();
+    Assert.assertTrue(isSuccess);
+    Assert.assertTrue(follower.isPausingOrPaused());
+
+    writeThread.join();
+
+    RaftLog leaderLog = leader.getState().getLog();
+    // leader should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, messages));
+    RaftLog followerLog = follower.getState().getLog();
+    // follower should contain less messages because it was paused already.
+    Assert.assertTrue(followerLog.getEntries(0, messages.length).length < 
messages.length);

Review comment:
       This is incorrect since there are entries other than the messages.

##########
File path: 
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1181,41 @@ public InstallSnapshotReplyProto 
installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  public boolean pause() throws IOException {
+    // TODO: should pause() be limited on only working for a follower?
+
+    // Now the state of lifeCycle should be PAUSING, which will prevent future 
other operations.
+    // Pause() should pause ongoing operations:
+    //  a. call {@link StateMachine#pause()}.
+    synchronized (this) {
+      if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+        return false;
+      }
+      // TODO: any other operations that needs to be paused?
+      stateMachine.pause();
+      lifeCycle.compareAndTransition(PAUSING, PAUSED);
+    }
+    return true;
+  }
+
+  public boolean resume() {
+    synchronized (this) {
+      if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) {
+        return false;
+      }
+      // TODO: any other operations that needs to be resumed?
+      try {
+        stateMachine.reinitialize();
+      } catch (IOException e) {
+        LOG.warn("Failed to reinitialize statemachine: {}", 
stateMachine.toString());
+        lifeCycle.compareAndTransition(STARTING, PAUSED);

Review comment:
       Life cycle cannot be transited from STARTING to PAUSED.  It must be 
transited to EXCEPTION in this case.

##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, 
cluster, LOG));
   }
 
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, 
IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    // keep sending messages to the leader.
+    SimpleMessage[] messages = SimpleMessage.create(100);

Review comment:
       Create two set of messages.  Send one set before pause and then send the 
other set after pause.  The paused follower should have all messages from the 
first set but none for the second set.

##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, 
cluster, LOG));
   }
 
+  @Test
+  public void testPauseResume() throws Exception {

Review comment:
       Let's move this to a new test since Pause-Resume is not a basic feature. 
 :)

##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, 
cluster, LOG));
   }
 
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, 
IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    // keep sending messages to the leader.
+    SimpleMessage[] messages = SimpleMessage.create(100);
+    Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, 
leaderId, messages);
+    // pause follower.
+    boolean isSuccess = follower.pause();
+    Assert.assertTrue(isSuccess);
+    Assert.assertTrue(follower.isPausingOrPaused());
+
+    writeThread.join();
+

Review comment:
       Sleep for some time before comparing the log.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to