szetszwo commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r494480462
##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false,
cluster, LOG));
}
+ @Test
+ public void testPauseResume() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+ }
+
+ void runTestPauseResume(CLUSTER cluster) throws InterruptedException,
IOException {
+ // wait leader be elected.
+ RaftServerImpl leader = waitForLeader(cluster);
+ RaftPeerId leaderId = leader.getId();
+ List<RaftServerImpl> followers = cluster.getFollowers();
+ Assert.assertTrue(followers.size() >= 1);
+ RaftServerImpl follower = followers.get(0);
+
+ // keep sending messages to the leader.
+ SimpleMessage[] messages = SimpleMessage.create(100);
+ Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster,
leaderId, messages);
+ // pause follower.
+ boolean isSuccess = follower.pause();
+ Assert.assertTrue(isSuccess);
+ Assert.assertTrue(follower.isPausingOrPaused());
+
+ writeThread.join();
+
+ RaftLog leaderLog = leader.getState().getLog();
+ // leader should contain all logs.
+ Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, messages));
+ RaftLog followerLog = follower.getState().getLog();
+ // follower should contain less messages because it was paused already.
+ Assert.assertTrue(followerLog.getEntries(0, messages.length).length <
messages.length);
Review comment:
This is incorrect since there are entries other than the messages.
##########
File path:
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1181,41 @@ public InstallSnapshotReplyProto
installSnapshot(InstallSnapshotRequestProto req
return reply;
}
+ public boolean pause() throws IOException {
+ // TODO: should pause() be limited on only working for a follower?
+
+ // Now the state of lifeCycle should be PAUSING, which will prevent future
other operations.
+ // Pause() should pause ongoing operations:
+ // a. call {@link StateMachine#pause()}.
+ synchronized (this) {
+ if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+ return false;
+ }
+ // TODO: any other operations that needs to be paused?
+ stateMachine.pause();
+ lifeCycle.compareAndTransition(PAUSING, PAUSED);
+ }
+ return true;
+ }
+
+ public boolean resume() {
+ synchronized (this) {
+ if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) {
+ return false;
+ }
+ // TODO: any other operations that needs to be resumed?
+ try {
+ stateMachine.reinitialize();
+ } catch (IOException e) {
+ LOG.warn("Failed to reinitialize statemachine: {}",
stateMachine.toString());
+ lifeCycle.compareAndTransition(STARTING, PAUSED);
Review comment:
Life cycle cannot be transited from STARTING to PAUSED. It must be
transited to EXCEPTION in this case.
##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false,
cluster, LOG));
}
+ @Test
+ public void testPauseResume() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+ }
+
+ void runTestPauseResume(CLUSTER cluster) throws InterruptedException,
IOException {
+ // wait leader be elected.
+ RaftServerImpl leader = waitForLeader(cluster);
+ RaftPeerId leaderId = leader.getId();
+ List<RaftServerImpl> followers = cluster.getFollowers();
+ Assert.assertTrue(followers.size() >= 1);
+ RaftServerImpl follower = followers.get(0);
+
+ // keep sending messages to the leader.
+ SimpleMessage[] messages = SimpleMessage.create(100);
Review comment:
Create two set of messages. Send one set before pause and then send the
other set after pause. The paused follower should have all messages from the
first set but none for the second set.
##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false,
cluster, LOG));
}
+ @Test
+ public void testPauseResume() throws Exception {
Review comment:
Let's move this to a new test since Pause-Resume is not a basic feature.
:)
##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false,
cluster, LOG));
}
+ @Test
+ public void testPauseResume() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+ }
+
+ void runTestPauseResume(CLUSTER cluster) throws InterruptedException,
IOException {
+ // wait leader be elected.
+ RaftServerImpl leader = waitForLeader(cluster);
+ RaftPeerId leaderId = leader.getId();
+ List<RaftServerImpl> followers = cluster.getFollowers();
+ Assert.assertTrue(followers.size() >= 1);
+ RaftServerImpl follower = followers.get(0);
+
+ // keep sending messages to the leader.
+ SimpleMessage[] messages = SimpleMessage.create(100);
+ Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster,
leaderId, messages);
+ // pause follower.
+ boolean isSuccess = follower.pause();
+ Assert.assertTrue(isSuccess);
+ Assert.assertTrue(follower.isPausingOrPaused());
+
+ writeThread.join();
+
Review comment:
Sleep for some time before comparing the log.
##########
File path:
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1181,41 @@ public InstallSnapshotReplyProto
installSnapshot(InstallSnapshotRequestProto req
return reply;
}
+ public boolean pause() throws IOException {
+ // TODO: should pause() be limited on only working for a follower?
+
+ // Now the state of lifeCycle should be PAUSING, which will prevent future
other operations.
+ // Pause() should pause ongoing operations:
+ // a. call {@link StateMachine#pause()}.
+ synchronized (this) {
+ if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+ return false;
+ }
+ // TODO: any other operations that needs to be paused?
+ stateMachine.pause();
+ lifeCycle.compareAndTransition(PAUSING, PAUSED);
+ }
+ return true;
+ }
+
+ public boolean resume() {
+ synchronized (this) {
+ if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) {
+ return false;
+ }
+ // TODO: any other operations that needs to be resumed?
+ try {
+ stateMachine.reinitialize();
+ } catch (IOException e) {
+ LOG.warn("Failed to reinitialize statemachine: {}",
stateMachine.toString());
+ lifeCycle.compareAndTransition(STARTING, PAUSED);
Review comment:
It should also rethrow the exception.
##########
File path:
ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test server pause and resume. */
+public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+ public static final int NUM_SERVERS = 3;
+
+ @Test
+ public void testPauseResume() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+ }
+
+ void runTestPauseResume(CLUSTER cluster) throws InterruptedException,
IOException {
+ // wait leader be elected.
+ RaftServerImpl leader = waitForLeader(cluster);
+ RaftPeerId leaderId = leader.getId();
+ List<RaftServerImpl> followers = cluster.getFollowers();
+ Assert.assertTrue(followers.size() >= 1);
+ RaftServerImpl follower = followers.get(0);
+
+ SimpleMessage[] batch1 = SimpleMessage.create(100);
+ Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster,
leaderId, batch1);
+
+ writeThread.join();
+ Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+ RaftLog leaderLog = leader.getState().getLog();
+ // leader should contain all logs.
+ Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, batch1));
+ RaftLog followerLog = follower.getState().getLog();
+ // leader should contain all logs.
+ Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, batch1));
+
+ // pause follower.
+ boolean isSuccess = follower.pause();
+ Assert.assertTrue(isSuccess);
+ Assert.assertTrue(follower.isPausingOrPaused());
+
+ SimpleMessage[] batch2 = SimpleMessage.create(100);
+ Thread writeThread2 = RaftTestUtil.sendMessageInNewThread(cluster,
leaderId, batch2);
+
+ writeThread2.join();
+ Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+ // paused follower should not have any batch2 message in its raftlog.
+ Assert.assertTrue(RaftTestUtil.logEntriesNotContains(followerLog, batch2));
+ }
Review comment:
We should call resume() and see if the follower could get the batch2.
##########
File path:
ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test server pause and resume. */
+public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+ public static final int NUM_SERVERS = 3;
+
+ @Test
+ public void testPauseResume() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+ }
+
+ void runTestPauseResume(CLUSTER cluster) throws InterruptedException,
IOException {
+ // wait leader be elected.
+ RaftServerImpl leader = waitForLeader(cluster);
+ RaftPeerId leaderId = leader.getId();
+ List<RaftServerImpl> followers = cluster.getFollowers();
+ Assert.assertTrue(followers.size() >= 1);
+ RaftServerImpl follower = followers.get(0);
+
+ SimpleMessage[] batch1 = SimpleMessage.create(100);
Review comment:
We should use SimpleMessage.create(int numMessages, String prefix) to
create batch1 and batch2 with different prefixes.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]