This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit ce5357a2eac6aee4e824649776e916b6dbd0fe22 Author: Yaolong Liu <[email protected]> AuthorDate: Wed Jul 27 02:09:05 2022 +0800 RATIS-1640. Add unit-test of listener related to setConfiguration and takeSnapshot (#697) (cherry picked from commit d3a0f9491f17462555c8fe522cbdc2ea4c88ef3b) --- .../ratis/server/impl/LeaderElectionTests.java | 88 ++++++++++++++++++++++ .../apache/ratis/server/impl/MiniRaftCluster.java | 6 ++ .../ratis/statemachine/SnapshotManagementTest.java | 30 ++++++++ 3 files changed, 124 insertions(+) diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index b988d3f4f..6b5d04b24 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -23,6 +23,7 @@ import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.metrics.RatisMetricRegistry; +import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupMemberId; @@ -46,12 +47,16 @@ import org.junit.Test; import org.slf4j.Logger; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.ratis.RaftTestUtil.waitForLeader; import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LAST_LEADER_ELECTION_ELAPSED_TIME; @@ -312,6 +317,89 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> cluster.setBlockRequestsFrom(id.toString(), false); } + @Test + public void testAddListener() throws Exception { + try (final MiniRaftCluster cluster = newCluster(3)) { + cluster.start(); + final RaftServer.Division leader = waitForLeader(cluster); + try (RaftClient client = cluster.createClient(leader.getId())) { + client.io().send(new RaftTestUtil.SimpleMessage("message")); + List<RaftPeer> servers = cluster.getPeers(); + Assert.assertEquals(servers.size(), 3); + MiniRaftCluster.PeerChanges changes = cluster.addNewPeers(1, + true, false, RaftProtos.RaftPeerRole.LISTENER); + RaftClientReply reply = client.admin().setConfiguration(servers, Arrays.asList(changes.newPeers)); + Assert.assertTrue(reply.isSuccess()); + Collection<RaftPeer> listener = + leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER); + Assert.assertEquals(1, listener.size()); + Assert.assertEquals(changes.newPeers[0].getId(), new ArrayList<>(listener).get(0).getId()); + } + cluster.shutdown(); + } + } + + @Test + public void testRemoveListener() throws Exception { + try(final MiniRaftCluster cluster = newCluster(3,1)) { + cluster.start(); + final RaftServer.Division leader = waitForLeader(cluster); + try (RaftClient client = cluster.createClient(leader.getId())) { + client.io().send(new RaftTestUtil.SimpleMessage("message")); + Assert.assertEquals(1, cluster.getListeners().size()); + List<RaftPeer> servers = cluster.getFollowers().stream().map(RaftServer.Division::getPeer).collect( + Collectors.toList()); + servers.add(leader.getPeer()); + RaftClientReply reply = client.admin().setConfiguration(servers); + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(0, leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER).size()); + } + cluster.shutdown(); + } + } + + @Test + public void testChangeFollowerToListener() throws Exception { + try(final MiniRaftCluster cluster = newCluster(3)) { + cluster.start(); + final RaftServer.Division leader = waitForLeader(cluster); + try (RaftClient client = cluster.createClient()) { + client.io().send(new RaftTestUtil.SimpleMessage("message")); + List<RaftPeer> followers = cluster.getFollowers().stream().map( + RaftServer.Division::getPeer).collect(Collectors.toList()); + Assert.assertEquals(2, followers.size()); + List<RaftPeer> listeners = new ArrayList<>(); + listeners.add(followers.get(1)); + followers.remove(1); + RaftClientReply reply = client.admin().setConfiguration(followers, listeners); + Assert.assertTrue(reply.isSuccess()); + Collection<RaftPeer> peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER); + Assert.assertEquals(1, peer.size()); + Assert.assertEquals(listeners.get(0).getId(), new ArrayList<>(peer).get(0).getId()); + } + cluster.shutdown(); + } + } + + @Test + public void testChangeListenerToFollower() throws Exception { + try(final MiniRaftCluster cluster = newCluster(2, 1)) { + cluster.start(); + final RaftServer.Division leader = waitForLeader(cluster); + try (RaftClient client = cluster.createClient(leader.getId())) { + client.io().send(new RaftTestUtil.SimpleMessage("message")); + List<RaftPeer> listeners = cluster.getListeners() + .stream().map(RaftServer.Division::getPeer).collect(Collectors.toList()); + Assert.assertEquals(listeners.size(), 1); + RaftClientReply reply = client.admin().setConfiguration(cluster.getPeers()); + Assert.assertTrue(reply.isSuccess()); + Collection<RaftPeer> peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER); + Assert.assertEquals(0, peer.size()); + } + cluster.shutdown(); + } + } + @Test public void testLeaderElectionMetrics() throws IOException, InterruptedException { Timestamp timestamp = Timestamp.currentTime(); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index f5cd38b36..1f4047524 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -656,6 +656,12 @@ public abstract class MiniRaftCluster implements Closeable { .collect(Collectors.toList()); } + public List<RaftServer.Division> getListeners() { + return getServerAliveStream() + .filter(server -> server.getInfo().isListener()) + .collect(Collectors.toList()); + } + public int getNumServers() { return servers.size(); } diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java index 608407786..c821f36c4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java @@ -143,4 +143,34 @@ public abstract class SnapshotManagementTest<CLUSTER extends MiniRaftCluster> .getStateMachineStorage().getSnapshotFile(follower.getInfo().getCurrentTerm(), snapshotIndex); Assert.assertTrue(snapshotFile.exists()); } + + + @Test + public void testReceiveLogAndTakeSnapshotOnListener() throws Exception { + runWithNewCluster(2, 1, this::runTestReceiveLogAndTakeSnapshotOnListener); + } + + void runTestReceiveLogAndTakeSnapshotOnListener(CLUSTER cluster) throws Exception { + final RaftClientReply snapshotReply; + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + final RaftServer.Division listener = cluster.getListeners().get(0); + final RaftPeerId listenerId = listener.getId(); + Assert.assertTrue(listener.getInfo().isListener()); + try (final RaftClient client = cluster.createClient(listenerId)) { + for (int i = 0; i < RaftServerConfigKeys.Snapshot.creationGap(getProperties()); i++) { + RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i)); + Assert.assertTrue(reply.isSuccess()); + } + snapshotReply = client.getSnapshotManagementApi(listenerId).create(3000); + } + + Assert.assertTrue(snapshotReply.isSuccess()); + final long snapshotIndex = snapshotReply.getLogIndex(); + LOG.info("snapshotIndex = {} on {} server {}", + snapshotIndex, listener.getInfo().getCurrentRole(), listener.getId()); + + final File snapshotFile = SimpleStateMachine4Testing.get(listener) + .getStateMachineStorage().getSnapshotFile(listener.getInfo().getCurrentTerm(), snapshotIndex); + Assert.assertTrue(snapshotFile.exists()); + } }
