Repository: incubator-ratis Updated Branches: refs/heads/master ce5f48c41 -> 0b7337083
Revert "r371" This reverts commit 4cb6b20237d7bf95a2c1acbaeec0ab58bc2a0074. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/0b733708 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/0b733708 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/0b733708 Branch: refs/heads/master Commit: 0b7337083600f584624384c05bf5b6236e0f9dbe Parents: ce5f48c Author: Tsz Wo Nicholas Sze <szets...@apache.org> Authored: Thu Oct 25 06:56:16 2018 +0800 Committer: Tsz Wo Nicholas Sze <szets...@apache.org> Committed: Thu Oct 25 06:57:34 2018 +0800 ---------------------------------------------------------------------- .../org/apache/ratis/TestRestartRaftPeer.java | 106 +++++++++++ .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 9 +- .../ratis/grpc/TestServerRestartWithGrpc.java | 25 --- .../ratis/netty/TestServerRestartWithNetty.java | 25 --- .../java/org/apache/ratis/RaftAsyncTests.java | 6 +- .../java/org/apache/ratis/RaftTestUtil.java | 18 +- .../java/org/apache/ratis/RetryCacheTests.java | 19 +- .../org/apache/ratis/WatchRequestTests.java | 188 +++++++++---------- .../apache/ratis/server/ServerRestartTests.java | 110 ----------- .../apache/ratis/server/TestRaftLogMetrics.java | 69 ++++--- .../impl/RaftReconfigurationBaseTest.java | 13 +- .../ratis/server/impl/RaftServerTestUtil.java | 11 +- .../impl/RaftStateMachineExceptionTests.java | 9 +- .../server/impl/StateMachineShutdownTests.java | 2 +- .../TestServerRestartWithSimulatedRpc.java | 25 --- .../server/storage/RaftStorageTestUtils.java | 9 +- .../statemachine/RaftSnapshotBaseTest.java | 75 +++----- .../SimpleStateMachine4Testing.java | 78 +++----- 18 files changed, 325 insertions(+), 472 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java new file mode 100644 index 0000000..ccbbda0 --- /dev/null +++ b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.log4j.Level; +import org.apache.ratis.RaftTestUtil.SimpleMessage; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.examples.ParameterizedBaseTest; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.SizeInBytes; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +/** + * Test restarting raft peers. + */ +@RunWith(Parameterized.class) +public class TestRestartRaftPeer extends BaseTest { + static { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + @Parameterized.Parameters + public static Collection<Object[]> data() throws IOException { + RaftProperties prop = new RaftProperties(); + prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); + RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB")); + return ParameterizedBaseTest.getMiniRaftClusters(prop, 3); + } + + @Parameterized.Parameter + public MiniRaftCluster cluster; + + @Test + public void restartFollower() throws Exception { + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient(leaderId); + + // write some messages + final byte[] content = new byte[1024]; + Arrays.fill(content, (byte) 1); + final SimpleMessage message = new SimpleMessage(new String(content)); + for (int i = 0; i < 10; i++) { + Assert.assertTrue(client.send(message).isSuccess()); + } + + // restart a follower + RaftPeerId followerId = cluster.getFollowers().get(0).getId(); + LOG.info("Restart follower {}", followerId); + cluster.restartServer(followerId, false); + + // write some more messages + for (int i = 0; i < 10; i++) { + Assert.assertTrue(client.send(message).isSuccess()); + } + client.close(); + + // make sure the restarted follower can catchup + boolean catchup = false; + long lastAppliedIndex = 0; + for (int i = 0; i < 10 && !catchup; i++) { + Thread.sleep(500); + lastAppliedIndex = cluster.getRaftServerImpl(followerId).getState().getLastAppliedIndex(); + catchup = lastAppliedIndex >= 20; + } + Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup); + + // make sure the restarted peer's log segments is correct + cluster.restartServer(followerId, false); + Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog() + .getLastEntryTermIndex().getIndex() >= 20); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index d98be53..7ae385d 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -92,17 +92,16 @@ public class TestRaftWithGrpc Assert.assertEquals(raftServer.getState().getLog().getNextIndex(), index); if (!raftServer.isLeader()) { TermIndex[] serverEntries = raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE); - Assert.assertArrayEquals(serverEntries, leaderEntries); + Arrays.equals(serverEntries, leaderEntries); } }); // Wait for heartbeats from leader to be received by followers - Thread.sleep(500); + Thread.sleep(1000); RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender -> { // FollowerInfo in the leader state should have updated next and match index. - final long followerMatchIndex = logAppender.getFollower().getMatchIndex(); - Assert.assertTrue(followerMatchIndex >= index - 1); - Assert.assertEquals(followerMatchIndex + 1, logAppender.getFollower().getNextIndex()); + Assert.assertEquals(logAppender.getFollower().getMatchIndex(), index - 1); + Assert.assertEquals(logAppender.getFollower().getNextIndex(), index); }); } cluster.shutdown(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java deleted file mode 100644 index 682b3ba..0000000 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc; - -import org.apache.ratis.server.ServerRestartTests; - -public class TestServerRestartWithGrpc - extends ServerRestartTests<MiniRaftClusterWithGrpc> - implements MiniRaftClusterWithGrpc.FactoryGet { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java deleted file mode 100644 index 15dc688..0000000 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.netty; - -import org.apache.ratis.server.ServerRestartTests; - -public class TestServerRestartWithNetty - extends ServerRestartTests<MiniRaftClusterWithNetty> - implements MiniRaftClusterWithNetty.FactoryGet { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index c14515c..f79eb6b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -192,7 +192,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba // submit some messages final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>(); for (int i = 0; i < numMesssages; i++) { - final String s = "" + i; + final String s = "m" + i; LOG.info("sendAsync " + s); futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage(s))); } @@ -218,12 +218,12 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba // test a failure case testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index", () -> client.sendStaleReadAsync( - new RaftTestUtil.SimpleMessage("" + Long.MAX_VALUE), + new RaftTestUtil.SimpleMessage("" + (numMesssages + 1)), followerCommitInfo.getCommitIndex(), follower), StateMachineException.class, IndexOutOfBoundsException.class); // test sendStaleReadAsync - for (int i = 0; i < numMesssages; i++) { + for (int i = 1; i < followerCommitInfo.getCommitIndex(); i++) { final int query = i; LOG.info("sendStaleReadAsync, query=" + query); final Message message = new RaftTestUtil.SimpleMessage("" + query); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 5946a47..60629f9 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -113,14 +113,11 @@ public interface RaftTestUtil { return leader != null ? leader.getId().toString() : null; } - static boolean logEntriesContains(RaftLog log, SimpleMessage... expectedMessages) { - return logEntriesContains(log, 0L, Long.MAX_VALUE, expectedMessages); - } - - static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, SimpleMessage... expectedMessages) { + static boolean logEntriesContains(RaftLog log, + SimpleMessage... expectedMessages) { int idxEntries = 0; int idxExpected = 0; - TermIndex[] termIndices = log.getEntries(startIndex, endIndex); + TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE); while (idxEntries < termIndices.length && idxExpected < expectedMessages.length) { try { @@ -379,13 +376,4 @@ public interface RaftTestUtil { } }).start(); } - - static void assertSameLog(RaftLog expected, RaftLog computed) throws Exception { - Assert.assertEquals(expected.getLastEntryTermIndex(), computed.getLastEntryTermIndex()); - final long lastIndex = expected.getNextIndex() - 1; - Assert.assertEquals(expected.getLastEntryTermIndex().getIndex(), lastIndex); - for(long i = 0; i < lastIndex; i++) { - Assert.assertEquals(expected.get(i), computed.get(i)); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java index c962481..9fdb4f7 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java @@ -29,8 +29,6 @@ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerTestUtil; -import org.apache.ratis.server.storage.RaftLog; -import org.apache.ratis.server.storage.RaftLogIOException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -38,7 +36,6 @@ import org.junit.Test; import java.io.IOException; import java.util.Arrays; -import java.util.stream.LongStream; import static java.util.Arrays.asList; @@ -113,21 +110,10 @@ public abstract class RetryCacheTests extends BaseTest { Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server)); Assert.assertNotNull(RaftServerTestUtil.getRetryEntry(server, clientId, callId)); // make sure there is only one log entry committed - Assert.assertEquals(1, count(server.getState().getLog(), oldLastApplied + 1)); + Assert.assertEquals(oldLastApplied + 1, server.getState().getLastAppliedIndex()); } } - static int count(RaftLog log, long startIndex) throws RaftLogIOException { - final long nextIndex = log.getNextIndex(); - int count = 0; - for(long i = startIndex; i < nextIndex; i++) { - if (log.get(i).hasStateMachineLogEntry()) { - count++; - } - } - return count; - } - /** * Test retry while the leader changes to another peer */ @@ -172,7 +158,8 @@ public abstract class RetryCacheTests extends BaseTest { } // check the new leader and make sure the retry did not get committed - Assert.assertEquals(0, count(cluster.getLeader().getState().getLog(), oldLastApplied + 1)); + Assert.assertEquals(oldLastApplied + 3, + cluster.getLeader().getState().getLastAppliedIndex()); client.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java index d1cb7e0..9ff27ad 100644 --- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java @@ -41,8 +41,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> extends BaseTest @@ -73,6 +71,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } static class TestParameters { + final long startLogIndex; final int numMessages; final RaftClient writeClient; final RaftClient watchMajorityClient; @@ -82,10 +81,12 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final MiniRaftCluster cluster; final Logger log; - TestParameters(int numMessages, RaftClient writeClient, + TestParameters( + long startLogIndex, int numMessages, RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient, MiniRaftCluster cluster, Logger log) { + this.startLogIndex = startLogIndex; this.numMessages = numMessages; this.writeClient = writeClient; this.watchMajorityClient = watchMajorityClient; @@ -96,31 +97,9 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> this.log = log; } - void sendRequests(List<CompletableFuture<RaftClientReply>> replies, - List<CompletableFuture<WatchReplies>> watches) { - for(int i = 0; i < numMessages; i++) { - final String message = "m" + i; - log.info("SEND_REQUEST {}: message={}", i, message); - final CompletableFuture<RaftClientReply> replyFuture = writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message)); - replies.add(replyFuture); - final CompletableFuture<WatchReplies> watchFuture = new CompletableFuture<>(); - watches.add(watchFuture); - replyFuture.thenAccept(reply -> { - final long logIndex = reply.getLogIndex(); - log.info("SEND_WATCH: message={}, logIndex={}", message, logIndex); - watchFuture.complete(new WatchReplies(logIndex, - watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY), - watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL), - watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED), - watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED) - )); - }); - } - } - @Override public String toString() { - return "numMessages=" + numMessages; + return "startLogIndex=" + startLogIndex + ", numMessages=" + numMessages; } } @@ -140,9 +119,10 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final RaftClient watchAllCommittedClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) { final int[] numMessages = {1, 10, 100}; for(int i = 0; i < 5; i++) { + final long logIndex = getLogIndex(writeClient) + 1; final int n = numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)]; final TestParameters p = new TestParameters( - n, writeClient, watchMajorityClient, watchAllClient, + logIndex, n, writeClient, watchMajorityClient, watchAllClient, watchMajorityCommittedClient, watchAllCommittedClient, cluster, LOG); LOG.info("{}) {}, {}", i, p, cluster.printServers()); testCase.apply(p); @@ -151,29 +131,18 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } } - static class WatchReplies { - private final long logIndex; - private final CompletableFuture<RaftClientReply> majority; - private final CompletableFuture<RaftClientReply> all; - private final CompletableFuture<RaftClientReply> majorityCommitted; - private final CompletableFuture<RaftClientReply> allCommitted; - - WatchReplies(long logIndex, - CompletableFuture<RaftClientReply> majority, CompletableFuture<RaftClientReply> all, - CompletableFuture<RaftClientReply> majorityCommitted, CompletableFuture<RaftClientReply> allCommitted) { - this.logIndex = logIndex; - this.majority = majority; - this.all = all; - this.majorityCommitted = majorityCommitted; - this.allCommitted = allCommitted; - } - } - static Void runTestWatchRequestAsync(TestParameters p) throws Exception { - final Logger LOG = p.log; - final MiniRaftCluster cluster = p.cluster; - final int numMessages = p.numMessages; + runTestWatchRequestAsync(p.startLogIndex, p.numMessages, + p.writeClient, p.watchMajorityClient, p.watchAllClient, + p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, p.log); + return null; + } + static void runTestWatchRequestAsync( + long startLogIndex, int numMessages, + RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, + RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient, + MiniRaftCluster cluster, Logger LOG) throws Exception { // blockStartTransaction of the leader so that no transaction can be committed MAJORITY final RaftServerImpl leader = cluster.getLeader(); LOG.info("block leader {}", leader.getId()); @@ -187,35 +156,52 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> // send a message final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>(); - final List<CompletableFuture<WatchReplies>> watches = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchMajoritys = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchAlls = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new ArrayList<>(); - p.sendRequests(replies, watches); + for(int i = 0; i < numMessages; i++) { + final long logIndex = startLogIndex + i; + final String message = "m" + logIndex; + LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, message); + replies.add(writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message))); + watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY)); + watchAlls.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL)); + watchMajorityCommitteds.add(watchMajorityCommittedClient.sendWatchAsync( + logIndex, ReplicationLevel.MAJORITY_COMMITTED)); + watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED)); + } Assert.assertEquals(numMessages, replies.size()); - Assert.assertEquals(numMessages, watches.size()); + Assert.assertEquals(numMessages, watchMajoritys.size()); + Assert.assertEquals(numMessages, watchAlls.size()); + Assert.assertEquals(numMessages, watchMajorityCommitteds.size()); + Assert.assertEquals(numMessages, watchAllCommitteds.size()); // since leader is blocked, nothing can be done. TimeUnit.SECONDS.sleep(1); assertNotDone(replies); - assertNotDone(watches); + assertNotDone(watchMajoritys); + assertNotDone(watchAlls); + assertNotDone(watchMajorityCommitteds); + assertNotDone(watchAllCommitteds); // unblock leader so that the transaction can be committed. SimpleStateMachine4Testing.get(leader).unblockStartTransaction(); LOG.info("unblock leader {}", leader.getId()); for(int i = 0; i < numMessages; i++) { + final long logIndex = startLogIndex + i; + LOG.info("UNBLOCK_LEADER {}: logIndex={}", i, logIndex); final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - final long logIndex = reply.getLogIndex(); - LOG.info("{}: receive reply for logIndex={}", i, logIndex); Assert.assertTrue(reply.isSuccess()); - - final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - Assert.assertEquals(logIndex, watchReplies.logIndex); - final RaftClientReply watchMajorityReply = watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + Assert.assertEquals(logIndex, reply.getLogIndex()); + final RaftClientReply watchMajorityReply = watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply); - Assert.assertTrue(watchMajorityReply.isSuccess()); + Assert.assertTrue(watchMajoritys.get(i).get().isSuccess()); final RaftClientReply watchMajorityCommittedReply - = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchMajorityCommittedReply({}) = ", logIndex, watchMajorityCommittedReply); Assert.assertTrue(watchMajorityCommittedReply.isSuccess()); { // check commit infos @@ -233,25 +219,22 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } } - Assert.assertEquals(numMessages, watches.size()); - // but not replicated/committed to all. TimeUnit.SECONDS.sleep(1); - assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all)); - assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted)); + assertNotDone(watchAlls); + assertNotDone(watchAllCommitteds); // unblock follower so that the transaction can be replicated and committed to all. LOG.info("unblock follower {}", blockedFollower.getId()); SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData(); for(int i = 0; i < numMessages; i++) { - final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - final long logIndex = watchReplies.logIndex; + final long logIndex = startLogIndex + i; LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex); - final RaftClientReply watchAllReply = watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + final RaftClientReply watchAllReply = watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply); Assert.assertTrue(watchAllReply.isSuccess()); - final RaftClientReply watchAllCommittedReply = watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + final RaftClientReply watchAllCommittedReply = watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply); Assert.assertTrue(watchAllCommittedReply.isSuccess()); { // check commit infos @@ -260,14 +243,9 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> commitInfos.forEach(info -> Assert.assertTrue(logIndex <= info.getCommitIndex())); } } - return null; } static <T> void assertNotDone(List<CompletableFuture<T>> futures) { - assertNotDone(futures.stream()); - } - - static <T> void assertNotDone(Stream<CompletableFuture<T>> futures) { futures.forEach(f -> { if (f.isDone()) { try { @@ -289,44 +267,65 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws Exception { - final Logger LOG = p.log; - final MiniRaftCluster cluster = p.cluster; - final int numMessages = p.numMessages; + runTestWatchRequestAsyncChangeLeader(p.startLogIndex, p.numMessages, + p.writeClient, p.watchMajorityClient, p.watchAllClient, + p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, p.log); + return null; + } + static void runTestWatchRequestAsyncChangeLeader( + long startLogIndex, int numMessages, + RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, + RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient, + MiniRaftCluster cluster, Logger LOG) throws Exception { // blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED final List<RaftServerImpl> followers = cluster.getFollowers(); final RaftServerImpl blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size())); LOG.info("block follower {}", blockedFollower.getId()); SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData(); + // send a message final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>(); - final List<CompletableFuture<WatchReplies>> watches = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchMajoritys = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchAlls = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new ArrayList<>(); - p.sendRequests(replies, watches); + for(int i = 0; i < numMessages; i++) { + final long logIndex = startLogIndex + i; + final String message = "m" + logIndex; + LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, message); + replies.add(writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message))); + watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY)); + watchAlls.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL)); + watchMajorityCommitteds.add( + watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED)); + watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED)); + } Assert.assertEquals(numMessages, replies.size()); - Assert.assertEquals(numMessages, watches.size()); + Assert.assertEquals(numMessages, watchMajoritys.size()); + Assert.assertEquals(numMessages, watchAlls.size()); + Assert.assertEquals(numMessages, watchMajorityCommitteds.size()); + Assert.assertEquals(numMessages, watchAllCommitteds.size()); // since only one follower is blocked, requests can be committed MAJORITY but neither ALL nor ALL_COMMITTED. for(int i = 0; i < numMessages; i++) { + final long logIndex = startLogIndex + i; + LOG.info("UNBLOCK_F1 {}: logIndex={}", i, logIndex); final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - final long logIndex = reply.getLogIndex(); - LOG.info("UNBLOCK_F1 {}: reply logIndex={}", i, logIndex); Assert.assertTrue(reply.isSuccess()); - - final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - Assert.assertEquals(logIndex, watchReplies.logIndex); - final RaftClientReply watchMajorityReply = watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + Assert.assertEquals(logIndex, reply.getLogIndex()); + final RaftClientReply watchMajorityReply = watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply); - Assert.assertTrue(watchMajorityReply.isSuccess()); + Assert.assertTrue(watchMajoritys.get(i).get().isSuccess()); final RaftClientReply watchMajorityCommittedReply - = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchMajorityCommittedReply({}) = ", logIndex, watchMajorityCommittedReply); Assert.assertTrue(watchMajorityCommittedReply.isSuccess()); { // check commit infos final Collection<CommitInfoProto> commitInfos = watchMajorityCommittedReply.getCommitInfos(); - LOG.info("commitInfos=" + commitInfos); Assert.assertEquals(NUM_SERVERS, commitInfos.size()); // One follower has not committed, so min must be less than logIndex @@ -340,8 +339,8 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } } TimeUnit.SECONDS.sleep(1); - assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all)); - assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted)); + assertNotDone(watchAlls); + assertNotDone(watchAllCommitteds); // Now change leader RaftTestUtil.changeLeader(cluster, cluster.getLeader().getId()); @@ -350,14 +349,13 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData(); LOG.info("unblock follower {}", blockedFollower.getId()); for(int i = 0; i < numMessages; i++) { - final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - final long logIndex = watchReplies.logIndex; + final long logIndex = startLogIndex + i; LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex); - final RaftClientReply watchAllReply = watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + final RaftClientReply watchAllReply = watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply); Assert.assertTrue(watchAllReply.isSuccess()); - final RaftClientReply watchAllCommittedReply = watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + final RaftClientReply watchAllCommittedReply = watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply); Assert.assertTrue(watchAllCommittedReply.isSuccess()); { // check commit infos @@ -366,6 +364,6 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> commitInfos.forEach(info -> Assert.assertTrue(logIndex <= info.getCommitIndex())); } } - return null; } + } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java deleted file mode 100644 index 5353caa..0000000 --- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.server; - -import org.apache.log4j.Level; -import org.apache.ratis.BaseTest; -import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.RaftTestUtil; -import org.apache.ratis.RaftTestUtil.SimpleMessage; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.ServerState; -import org.apache.ratis.server.storage.RaftLog; -import org.apache.ratis.statemachine.SimpleStateMachine4Testing; -import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.LogUtils; -import org.apache.ratis.util.SizeInBytes; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * Test restarting raft peers. - */ -public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> - extends BaseTest - implements MiniRaftCluster.Factory.Get<CLUSTER> { - static { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); - } - - static final int NUM_SERVERS = 3; - - @Before - public void setup() { - final RaftProperties prop = getProperties(); - prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, - SimpleStateMachine4Testing.class, StateMachine.class); - RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB")); - } - - @Test - public void testRestartFollower() throws Exception { - try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) { - runTestRestartFollower(cluster, LOG); - } - } - - static void runTestRestartFollower(MiniRaftCluster cluster, Logger LOG) throws Exception { - cluster.start(); - RaftTestUtil.waitForLeader(cluster); - final RaftPeerId leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient(leaderId); - - // write some messages - final byte[] content = new byte[1024]; - Arrays.fill(content, (byte)1); - final SimpleMessage message = new SimpleMessage(new String(content)); - for(int i = 0; i < 10; i++) { - Assert.assertTrue(client.send(message).isSuccess()); - } - - // restart a follower - RaftPeerId followerId = cluster.getFollowers().get(0).getId(); - LOG.info("Restart follower {}", followerId); - cluster.restartServer(followerId, false); - - // write some more messages - for(int i = 0; i < 10; i++) { - Assert.assertTrue(client.send(message).isSuccess()); - } - client.close(); - - final long leaderLastIndex = cluster.getLeader().getState().getLog().getLastEntryTermIndex().getIndex(); - // make sure the restarted follower can catchup - final ServerState followerState = cluster.getRaftServerImpl(followerId).getState(); - JavaUtils.attempt(() -> followerState.getLastAppliedIndex() >= leaderLastIndex, - 10, 500, "follower catchup", LOG); - - // make sure the restarted peer's log segments is correct - cluster.restartServer(followerId, false); - Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog() - .getLastEntryTermIndex().getIndex() >= leaderLastIndex); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java index 9cc60a6..978800d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java @@ -20,67 +20,61 @@ package org.apache.ratis.server; import com.codahale.metrics.Timer; import org.apache.log4j.Level; -import org.apache.ratis.BaseTest; -import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.metrics.RatisMetricsRegistry; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; -import org.apache.ratis.server.storage.RaftStorageTestUtils; -import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.util.LogUtils; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import javax.management.ObjectName; +import java.io.IOException; import java.lang.management.ManagementFactory; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; -public class TestRaftLogMetrics extends BaseTest - implements MiniRaftClusterWithSimulatedRpc.FactoryGet { +public class TestRaftLogMetrics { { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } public static final int NUM_SERVERS = 3; - { - getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, - MetricsStateMachine.class, StateMachine.class); - } + protected static final RaftProperties properties = new RaftProperties(); - static class MetricsStateMachine extends BaseStateMachine { - static MetricsStateMachine get(RaftServerImpl s) { - return (MetricsStateMachine)s.getStateMachine(); - } + private final MiniRaftClusterWithSimulatedRpc cluster = MiniRaftClusterWithSimulatedRpc + .FACTORY.newCluster(NUM_SERVERS, getProperties()); - private final AtomicInteger flushCount = new AtomicInteger(); + public RaftProperties getProperties() { + return properties; + } - int getFlushCount() { - return flushCount.get(); - } + @Before + public void setup() throws IOException { + Assert.assertNull(cluster.getLeader()); + cluster.start(); + } - @Override - public CompletableFuture<Void> flushStateMachineData(long index) { - flushCount.incrementAndGet(); - return super.flushStateMachineData(index); + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); } } - @Test - public void testFlushMetric() throws Exception { - try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) { - cluster.start(); - runTestFlushMetric(cluster); - } + private String getLogFlushTimeMetric(String serverId) { + return new StringBuilder("org.apache.ratis.server.storage.RaftLogWorker.") + .append(serverId).append(".flush-time").toString(); } - static void runTestFlushMetric(MiniRaftCluster cluster) throws Exception { + @Test + public void testFlushMetric() throws Exception { int numMsg = 2; final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMsg); @@ -91,21 +85,22 @@ public class TestRaftLogMetrics extends BaseTest } for (RaftServerProxy rsp: cluster.getServers()) { - final String flushTimeMetric = RaftStorageTestUtils.getLogFlushTimeMetric(rsp.getId()); + String flushTimeMetric = getLogFlushTimeMetric(rsp.getId().toString()); Timer tm = RatisMetricsRegistry.getRegistry().getTimers().get(flushTimeMetric); Assert.assertNotNull(tm); - final MetricsStateMachine stateMachine = MetricsStateMachine.get(rsp.getImpl(cluster.getGroupId())); - final int expectedFlush = stateMachine.getFlushCount(); + // Number of log entries expected = numMsg + 1 entry for start-log-segment + int numExpectedLogEntries = numMsg + 1; - Assert.assertEquals(expectedFlush, tm.getCount()); + Assert.assertEquals(numExpectedLogEntries, tm.getCount()); Assert.assertTrue(tm.getMeanRate() > 0); // Test jmx ObjectName oname = new ObjectName("metrics", "name", flushTimeMetric); - Assert.assertEquals(expectedFlush, + Assert.assertEquals(numExpectedLogEntries, ((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count")) .intValue()); } } + } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 246a9a2..e9651d6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -61,6 +61,8 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { protected static final RaftProperties prop = new RaftProperties(); + private static final ClientId clientId = ClientId.randomId(); + static final int STAGING_CATCHUP_GAP = 10; @BeforeClass public static void setup() { @@ -414,16 +416,17 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { cluster.start(); RaftTestUtil.waitForLeader(cluster); - final RaftServerImpl leader = cluster.getLeader(); - final RaftClient client = cluster.createClient(leader.getId()); + final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient(leaderId); client.send(new SimpleMessage("m")); - final RaftLog leaderLog = leader.getState().getLog(); - final long committedIndex = leaderLog.getLastCommittedIndex(); + final long committedIndex = cluster.getLeader().getState().getLog() + .getLastCommittedIndex(); final RaftConfiguration confBefore = cluster.getLeader().getRaftConf(); // no real configuration change in the request - final RaftClientReply reply = client.setConfiguration(cluster.getPeers().toArray(RaftPeer.emptyArray())); + RaftClientReply reply = client.setConfiguration(cluster.getPeers() + .toArray(new RaftPeer[0])); Assert.assertTrue(reply.isSuccess()); Assert.assertEquals(committedIndex, cluster.getLeader().getState() .getLog().getLastCommittedIndex()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index 827117e..bcfaf01 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -41,7 +41,8 @@ public class RaftServerTestUtil { 3, sleepMs, "waitAndCheckNewConf", LOG); } private static void waitAndCheckNewConf(MiniRaftCluster cluster, - RaftPeer[] peers, Collection<String> deadPeers) { + RaftPeer[] peers, Collection<String> deadPeers) + throws Exception { LOG.info(cluster.printServers()); Assert.assertNotNull(cluster.getLeader()); @@ -60,11 +61,9 @@ public class RaftServerTestUtil { numIncluded++; Assert.assertTrue(server.getRaftConf().isStable()); Assert.assertTrue(server.getRaftConf().hasNoChange(peers)); - } else if (server.isAlive()) { - // The server is successfully removed from the conf - // It may not be shutdown since it may not be able to talk to the new leader (who is not in its conf). - Assert.assertTrue(server.getRaftConf().isStable()); - Assert.assertFalse(server.getRaftConf().containsInConf(server.getId())); + } else { + Assert.assertFalse(server.getId() + " is still running: " + server, + server.isAlive()); } } Assert.assertEquals(peers.length, numIncluded + deadIncluded); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java index cf3a490..ec635d0 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java @@ -21,7 +21,6 @@ import org.apache.log4j.Level; import org.apache.ratis.BaseTest; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; -import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.conf.RaftProperties; @@ -108,8 +107,8 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu final RaftClientRpc rpc = client.getClientRpc(); final long callId = 999; final long seqNum = 111; - final SimpleMessage message = new SimpleMessage("message"); - final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, seqNum, message); + RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, + callId, seqNum, new RaftTestUtil.SimpleMessage("message")); RaftClientReply reply = rpc.sendRequest(r); Assert.assertFalse(reply.isSuccess()); Assert.assertNotNull(reply.getStateMachineException()); @@ -132,8 +131,8 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu } Assert.assertNotNull( RaftServerTestUtil.getRetryEntry(server, client.getId(), callId)); - final RaftLog log = server.getState().getLog(); - RaftTestUtil.logEntriesContains(log, oldLastApplied + 1, log.getNextIndex(), message); + Assert.assertEquals(oldLastApplied + 1, + server.getState().getLastAppliedIndex()); } client.close(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java index e566700..a66cf70 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java @@ -97,7 +97,7 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster> RaftClientReply watchReply = client.sendWatch( logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED); watchReply.getCommitInfos().forEach( - val -> Assert.assertTrue(val.getCommitIndex() >= logIndex)); + val -> Assert.assertEquals(val.getCommitIndex(), logIndex)); RaftServerImpl secondFollower = cluster.getFollowers().get(1); // Second follower is blocked in apply transaction http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java deleted file mode 100644 index 306e5e7..0000000 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.server.simulation; - -import org.apache.ratis.server.ServerRestartTests; - -public class TestServerRestartWithSimulatedRpc - extends ServerRestartTests<MiniRaftClusterWithSimulatedRpc> - implements MiniRaftClusterWithSimulatedRpc.FactoryGet { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java index e681b66..ad8308e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java @@ -18,8 +18,6 @@ package org.apache.ratis.server.storage; import org.apache.log4j.Level; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.util.AutoCloseableLock; @@ -32,10 +30,6 @@ public interface RaftStorageTestUtils { LogUtils.setLogLevel(RaftLogWorker.LOG, level); } - static String getLogFlushTimeMetric(RaftPeerId serverId) { - return RaftLogWorker.class.getName() + "." + serverId + ".flush-time"; - } - static void printLog(RaftLog log, Consumer<String> println) { if (log == null) { println.accept("log == null"); @@ -56,7 +50,8 @@ public interface RaftStorageTestUtils { b.append(i == committed? 'c': ' '); b.append(String.format("%3d: ", i)); try { - b.append(ServerProtoUtils.toLogEntryString(log.get(i))); + final RaftProtos.LogEntryProto entry = log.get(i); + b.append(entry != null? entry.getLogEntryBodyCase(): null); } catch (RaftLogIOException e) { b.append(e); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java index 0a5e38d..7a326a3 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -33,9 +33,7 @@ import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.server.storage.RaftStorageDirectory; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.util.FileUtils; -import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.junit.After; import org.junit.Assert; @@ -47,8 +45,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.LongStream; public abstract class RaftSnapshotBaseTest extends BaseTest { static { @@ -60,31 +56,25 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { static final Logger LOG = LoggerFactory.getLogger(RaftSnapshotBaseTest.class); private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10; - static List<File> getSnapshotFiles(MiniRaftCluster cluster, long startIndex, long endIndex) { + static File getSnapshotFile(MiniRaftCluster cluster, int i) { final RaftServerImpl leader = cluster.getLeader(); - final SimpleStateMachineStorage storage = SimpleStateMachine4Testing.get(leader).getStateMachineStorage(); - final long term = leader.getState().getCurrentTerm(); - return LongStream.range(startIndex, endIndex) - .mapToObj(i -> storage.getSnapshotFile(term, i)) - .collect(Collectors.toList()); + final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader); + return sm.getStateMachineStorage().getSnapshotFile( + leader.getState().getCurrentTerm(), i); } - - static void assertLeaderContent(MiniRaftCluster cluster) throws Exception { + static void assertLeaderContent(MiniRaftCluster cluster) + throws InterruptedException { final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster); - final RaftLog leaderLog = leader.getState().getLog(); - final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex(); - final LogEntryProto e = leaderLog.get(lastIndex); - + Assert.assertEquals(SNAPSHOT_TRIGGER_THRESHOLD * 2, + leader.getState().getLog().getLastCommittedIndex()); final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent(); - long message = 0; - for (int i = 0; i < entries.length; i++) { - LOG.info("{}) {} {}", i, message, entries[i]); - if (entries[i].hasStateMachineLogEntry()) { - final SimpleMessage m = new SimpleMessage("m" + message++); - Assert.assertArrayEquals(m.getContent().toByteArray(), - entries[i].getStateMachineLogEntry().getLogData().toByteArray()); - } + + for (int i = 1; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { + Assert.assertEquals(i+1, entries[i].getIndex()); + Assert.assertArrayEquals( + new SimpleMessage("m" + i).getContent().toByteArray(), + entries[i].getStateMachineLogEntry().getLogData().toByteArray()); } } @@ -128,12 +118,15 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { } } - final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex(); - LOG.info("nextIndex = {}", nextIndex); // wait for the snapshot to be done - final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex); - JavaUtils.attempt(() -> snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists), - 10, 1000, "snapshotFile.exist", LOG); + final File snapshotFile = getSnapshotFile(cluster, i); + + int retries = 0; + do { + Thread.sleep(1000); + } while (!snapshotFile.exists() && retries++ < 10); + + Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists()); // restart the peer and check if it can correctly load snapshot cluster.restart(false); @@ -145,14 +138,6 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { } } - static boolean exists(File f) { - if (f.exists()) { - LOG.info("File exists: " + f); - return true; - } - return false; - } - /** * Basic test for install snapshot: start a one node cluster and let it * generate a snapshot. Then delete the log and restart the node, and add more @@ -160,7 +145,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { */ @Test public void testBasicInstallSnapshot() throws Exception { - final List<LogPathAndIndex> logs; + List<LogPathAndIndex> logs; try { RaftTestUtil.waitForLeader(cluster); final RaftPeerId leaderId = cluster.getLeader().getId(); @@ -176,13 +161,15 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { // wait for the snapshot to be done RaftStorageDirectory storageDirectory = cluster.getLeader().getState() .getStorage().getStorageDir(); - - final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex(); - LOG.info("nextIndex = {}", nextIndex); - final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex); - JavaUtils.attempt(() -> snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists), - 10, 1000, "snapshotFile.exist", LOG); + final File snapshotFile = getSnapshotFile(cluster, i); logs = storageDirectory.getLogSegmentFiles(); + + int retries = 0; + do { + Thread.sleep(1000); + } while (!snapshotFile.exists() && retries++ < 10); + + Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists()); } finally { cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index 313e713..9a7267b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -20,15 +20,13 @@ package org.apache.ratis.statemachine; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.io.MD5Hash; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerProtoUtils; @@ -36,29 +34,22 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.LogInputStream; import org.apache.ratis.server.storage.LogOutputStream; import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.proto.RaftProtos.RoleInfoProto; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.statemachine.impl.TransactionContextImpl; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.util.Daemon; -import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.LifeCycle; -import org.apache.ratis.util.MD5FileUtil; -import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.Collections; -import java.util.EnumMap; -import java.util.Objects; -import java.util.SortedMap; -import java.util.TreeMap; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; /** * A {@link StateMachine} implementation example that simply stores all the log @@ -77,8 +68,8 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { return (SimpleStateMachine4Testing)s.getStateMachine(); } - private final SortedMap<Long, LogEntryProto> indexMap = Collections.synchronizedSortedMap(new TreeMap<>()); - private final SortedMap<String, LogEntryProto> dataMap = Collections.synchronizedSortedMap(new TreeMap<>()); + private final List<LogEntryProto> list = + Collections.synchronizedList(new ArrayList<>()); private final Daemon checkpointer; private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final RaftProperties properties = new RaftProperties(); @@ -128,14 +119,14 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { public SimpleStateMachine4Testing() { checkpointer = new Daemon(() -> { while (running) { - if (indexMap.lastKey() - endIndexLastCkpt >= SNAPSHOT_THRESHOLD) { - endIndexLastCkpt = takeSnapshot(); - } - - try { - TimeUnit.SECONDS.sleep(1); - } catch(InterruptedException ignored) { - } + if (list.get(list.size() - 1).getIndex() - endIndexLastCkpt >= + SNAPSHOT_THRESHOLD) { + endIndexLastCkpt = takeSnapshot(); + } + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } } }); } @@ -148,12 +139,6 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { return leaderElectionTimeoutInfo; } - private void put(LogEntryProto entry) { - final LogEntryProto previous = indexMap.put(entry.getIndex(), entry); - Preconditions.assertNull(previous, "previous"); - dataMap.put(entry.getStateMachineLogEntry().getLogData().toStringUtf8(), entry); - } - @Override public synchronized void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException { @@ -186,7 +171,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { @Override public CompletableFuture<Message> applyTransaction(TransactionContext trx) { LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry()); - put(entry); + list.add(entry); updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex()); return CompletableFuture.completedFuture( new SimpleMessage(entry.getIndex() + " OK")); @@ -207,7 +192,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { termIndex.getIndex(), snapshotFile); try (LogOutputStream out = new LogOutputStream(snapshotFile, false, segmentMaxSize, preallocatedSize, bufferSize)) { - for (final LogEntryProto entry : indexMap.values()) { + for (final LogEntryProto entry : list) { if (entry.getIndex() > endIndex) { break; } else { @@ -256,13 +241,13 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { snapshot.getFile().getPath().toFile(), 0, endIndex, false)) { LogEntryProto entry; while ((entry = in.nextEntry()) != null) { - put(entry); + list.add(entry); updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex()); } } Preconditions.assertTrue( - !indexMap.isEmpty() && endIndex == indexMap.lastKey(), - "endIndex=%s, indexMap=%s", endIndex, indexMap); + !list.isEmpty() && endIndex == list.get(list.size() - 1).getIndex(), + "endIndex=%s, list=%s", endIndex, list); this.endIndexLastCkpt = endIndex; setLastAppliedTermIndex(snapshot.getTermIndex()); this.storage.loadLatestSnapshot(); @@ -279,21 +264,18 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { */ @Override public CompletableFuture<Message> query(Message request) { - final String string = request.getContent().toStringUtf8(); - Exception exception; + final ByteString bytes = request.getContent(); try { - LOG.info("query " + string); - final LogEntryProto entry = dataMap.get(string); - if (entry != null) { - return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString())); - } - exception = new IndexOutOfBoundsException("Log entry not found for query " + string); + final long index = bytes.isEmpty()? getLastAppliedTermIndex().getIndex() + : Long.parseLong(bytes.toStringUtf8()); + LOG.info("query log index " + index); + final LogEntryProto entry = list.get(Math.toIntExact(index - 1)); + return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString())); } catch (Exception e) { LOG.warn("Failed request " + request, e); - exception = e; + return JavaUtils.completeExceptionally(new StateMachineException( + "Failed request " + request, e)); } - return JavaUtils.completeExceptionally(new StateMachineException( - "Failed request " + request, exception)); } static final ByteString STATE_MACHINE_DATA = ByteString.copyFromUtf8("StateMachine Data"); @@ -332,7 +314,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { } public LogEntryProto[] getContent() { - return indexMap.values().toArray(new LogEntryProto[0]); + return list.toArray(new LogEntryProto[list.size()]); } public void blockStartTransaction() {