Repository: incubator-ratis Updated Branches: refs/heads/master 959d493c0 -> 609773e03
RATIS-438. RaftBasicTests.testWithLoad may fail. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/609773e0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/609773e0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/609773e0 Branch: refs/heads/master Commit: 609773e03c96f733829a93cc5bbcc3febd25408d Parents: 959d493 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Sat Dec 1 12:50:04 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Sat Dec 1 12:50:04 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/JavaUtils.java | 19 +- .../java/org/apache/ratis/RaftAsyncTests.java | 186 +++++++++---------- .../java/org/apache/ratis/RaftBasicTests.java | 92 +++++---- .../java/org/apache/ratis/RaftTestUtil.java | 10 +- .../SimpleStateMachine4Testing.java | 6 +- .../ratis/grpc/TestRaftAsyncWithGrpc.java | 2 +- .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 42 ++--- 7 files changed, 187 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java index f3b0a0d..769e12f 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java @@ -72,6 +72,21 @@ public interface JavaUtils { return trace[3]; } + static <T extends Throwable> void runAsUnchecked(CheckedRunnable<T> runnable) { + runAsUnchecked(runnable::run, RuntimeException::new); + } + + static <THROWABLE extends Throwable> void runAsUnchecked( + CheckedRunnable<THROWABLE> runnable, Function<THROWABLE, ? extends RuntimeException> converter) { + try { + runnable.run(); + } catch(RuntimeException | Error e) { + throw e; + } catch(Throwable t) { + throw converter.apply(cast(t)); + } + } + /** * Invoke {@link Callable#call()} and, if there any, * wrap the checked exception by {@link RuntimeException}. @@ -88,9 +103,7 @@ public interface JavaUtils { } catch(RuntimeException | Error e) { throw e; } catch(Throwable t) { - @SuppressWarnings("unchecked") - final THROWABLE casted = (THROWABLE)t; - throw converter.apply(casted); + throw converter.apply(cast(t)); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index e459e7c..0719976 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -18,26 +18,31 @@ package org.apache.ratis; import org.apache.log4j.Level; +import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.impl.RaftClientTestUtil; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.*; +import org.apache.ratis.proto.RaftProtos.CommitInfoProto; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.RaftRetryFailureException; +import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.retry.RetryPolicies; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; -import org.apache.ratis.proto.RaftProtos.CommitInfoProto; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.TimeDuration; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -45,22 +50,23 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.ratis.RaftTestUtil.waitForLeader; public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends BaseTest implements MiniRaftCluster.Factory.Get<CLUSTER> { - static { + { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } public static final int NUM_SERVERS = 3; - @Before - public void setup() { + { getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class); } @@ -91,42 +97,43 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba @Test public void testRequestAsyncWithRetryPolicy() throws Exception { - LOG.info("Running testWatchRequestsWithRetryPolicy"); - try(final CLUSTER cluster = newCluster(NUM_SERVERS)) { - int maxRetries = 3; - final RetryPolicy retryPolicy = RetryPolicies - .retryUpToMaximumCountWithFixedSleep(maxRetries, TimeDuration.valueOf(1, TimeUnit.SECONDS)); - cluster.start(); - final RaftClient writeClient = - cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId(), retryPolicy); + runWithNewCluster(NUM_SERVERS, this::runTestRequestAsyncWithRetryPolicy); + } + + void runTestRequestAsyncWithRetryPolicy(CLUSTER cluster) throws Exception { + final RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( + 3, TimeDuration.valueOf(1, TimeUnit.SECONDS)); + final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster); + + try(final RaftClient writeClient = cluster.createClient(leader.getId(), retryPolicy)) { // blockStartTransaction of the leader so that no transaction can be committed MAJORITY - final RaftServerImpl leader = cluster.getLeader(); LOG.info("block leader {}", leader.getId()); SimpleStateMachine4Testing.get(leader).blockStartTransaction(); - RaftClientReply reply = - writeClient.sendAsync(RaftTestUtil.SimpleMessage.create(1)[0]).get(); + final SimpleMessage[] messages = SimpleMessage.create(2); + final RaftClientReply reply = writeClient.sendAsync(messages[0]).get(); RaftRetryFailureException rfe = reply.getRetryFailureException(); - Assert.assertTrue(rfe != null); + Assert.assertNotNull(rfe); Assert.assertTrue(rfe.getMessage().contains(retryPolicy.toString())); + // unblock leader so that the next transaction can be committed. SimpleStateMachine4Testing.get(leader).unblockStartTransaction(); // make sure the the next request succeeds. This will ensure the first // request completed - writeClient.sendAsync(RaftTestUtil.SimpleMessage.create(1)[0]).get(); - } + writeClient.sendAsync(messages[1]).get(); } + } @Test public void testAsyncRequestSemaphore() throws Exception { - LOG.info("Running testAsyncRequestSemaphore"); - final CLUSTER cluster = newCluster(NUM_SERVERS); - Assert.assertNull(cluster.getLeader()); - cluster.start(); + runWithNewCluster(NUM_SERVERS, this::runTestAsyncRequestSemaphore); + } + + void runTestAsyncRequestSemaphore(CLUSTER cluster) throws Exception { waitForLeader(cluster); int numMessages = RaftClientConfigKeys.Async.maxOutstandingRequests(getProperties()); CompletableFuture[] futures = new CompletableFuture[numMessages + 1]; - final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages); + final SimpleMessage[] messages = SimpleMessage.create(numMessages); final RaftClient client = cluster.createClient(); //Set blockTransaction flag so that transaction blocks cluster.getServers().stream() @@ -141,11 +148,11 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba futures[i] = client.sendAsync(messages[i]); blockedRequestsCount.decrementAndGet(); } - Assert.assertTrue(blockedRequestsCount.get() == 0); + Assert.assertEquals(0, blockedRequestsCount.get()); futures[numMessages] = CompletableFuture.supplyAsync(() -> { blockedRequestsCount.incrementAndGet(); - client.sendAsync(new RaftTestUtil.SimpleMessage("n1")); + client.sendAsync(new SimpleMessage("n1")); blockedRequestsCount.decrementAndGet(); return null; }); @@ -154,7 +161,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba while (blockedRequestsCount.get() != 1) { Thread.sleep(1000); } - Assert.assertTrue(blockedRequestsCount.get() == 1); + Assert.assertEquals(1, blockedRequestsCount.get()); //Since all semaphore permits are acquired the last message sent is in queue RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1); @@ -167,19 +174,12 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba for(int i=0; i<=numMessages; i++){ futures[i].join(); } - Assert.assertTrue(blockedRequestsCount.get() == 0); - cluster.shutdown(); + Assert.assertEquals(0, blockedRequestsCount.get()); } void runTestBasicAppendEntriesAsync(boolean killLeader) throws Exception { - final CLUSTER cluster = newCluster(killLeader? 5: 3); - try { - cluster.start(); - waitForLeader(cluster); - RaftBasicTests.runTestBasicAppendEntries(true, killLeader, 100, cluster, LOG); - } finally { - cluster.shutdown(); - } + runWithNewCluster(killLeader? 5: 3, + cluster -> RaftBasicTests.runTestBasicAppendEntries(true, killLeader, 100, cluster, LOG)); } @Test @@ -194,21 +194,18 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba @Test public void testWithLoadAsync() throws Exception { - LOG.info("Running testWithLoadAsync"); - final CLUSTER cluster = newCluster(NUM_SERVERS); - cluster.start(); - waitForLeader(cluster); - RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG); - cluster.shutdown(); + runWithNewCluster(NUM_SERVERS, + cluster -> RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG)); } @Test public void testStaleReadAsync() throws Exception { - final int numMesssages = 10; - final CLUSTER cluster = newCluster(NUM_SERVERS); + runWithNewCluster(NUM_SERVERS, this::runTestStaleReadAsync); + } + void runTestStaleReadAsync(CLUSTER cluster) throws Exception { + final int numMesssages = 10; try (RaftClient client = cluster.createClient()) { - cluster.start(); RaftTestUtil.waitForLeader(cluster); // submit some messages @@ -216,17 +213,19 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba for (int i = 0; i < numMesssages; i++) { final String s = "" + i; LOG.info("sendAsync " + s); - futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage(s))); + futures.add(client.sendAsync(new SimpleMessage(s))); } Assert.assertEquals(numMesssages, futures.size()); - RaftClientReply lastWriteReply = null; + final List<RaftClientReply> replies = new ArrayList<>(); for (CompletableFuture<RaftClientReply> f : futures) { - lastWriteReply = f.join(); - Assert.assertTrue(lastWriteReply.isSuccess()); + final RaftClientReply r = f.join(); + Assert.assertTrue(r.isSuccess()); + replies.add(r); } futures.clear(); // Use a follower with the max commit index + final RaftClientReply lastWriteReply = replies.get(replies.size() - 1); final RaftPeerId leader = lastWriteReply.getServerId(); LOG.info("leader = " + leader); final Collection<CommitInfoProto> commitInfos = lastWriteReply.getCommitInfos(); @@ -235,70 +234,72 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba .filter(info -> !RaftPeerId.valueOf(info.getServer().getId()).equals(leader)) .max(Comparator.comparing(CommitInfoProto::getCommitIndex)).get(); final RaftPeerId follower = RaftPeerId.valueOf(followerCommitInfo.getServer().getId()); - LOG.info("max follower = " + follower); + final long followerCommitIndex = followerCommitInfo.getCommitIndex(); + LOG.info("max follower = {}, commitIndex = {}", follower, followerCommitIndex); // test a failure case testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index", () -> client.sendStaleReadAsync( - new RaftTestUtil.SimpleMessage("" + Long.MAX_VALUE), + new SimpleMessage("" + Long.MAX_VALUE), followerCommitInfo.getCommitIndex(), follower), StateMachineException.class, IndexOutOfBoundsException.class); // test sendStaleReadAsync for (int i = 0; i < numMesssages; i++) { - final int query = i; - LOG.info("sendStaleReadAsync, query=" + query); - final Message message = new RaftTestUtil.SimpleMessage("" + query); + final RaftClientReply reply = replies.get(i); + final String query = "" + i; + LOG.info("query=" + query + ", reply=" + reply); + final Message message = new SimpleMessage(query); final CompletableFuture<RaftClientReply> readFuture = client.sendReadOnlyAsync(message); - final CompletableFuture<RaftClientReply> staleReadFuture = client.sendStaleReadAsync( - message, followerCommitInfo.getCommitIndex(), follower); - - futures.add(readFuture.thenApply(r -> getMessageContent(r)) - .thenCombine(staleReadFuture.thenApply(r -> getMessageContent(r)), (expected, computed) -> { - try { - LOG.info("query " + query + " returns " - + LogEntryProto.parseFrom(expected).getStateMachineLogEntry().getLogData().toStringUtf8()); - } catch (InvalidProtocolBufferException e) { - throw new CompletionException(e); - } - - Assert.assertEquals("log entry mismatch for query=" + query, expected, computed); - return null; - }) - ); + + futures.add(readFuture.thenCompose(r -> { + if (reply.getLogIndex() <= followerCommitIndex) { + LOG.info("sendStaleReadAsync, query=" + query); + return client.sendStaleReadAsync(message, followerCommitIndex, follower); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenApply(staleReadReply -> { + if (staleReadReply == null) { + return null; + } + + final ByteString expected = readFuture.join().getMessage().getContent(); + final ByteString computed = staleReadReply.getMessage().getContent(); + try { + LOG.info("query " + query + " returns " + + LogEntryProto.parseFrom(expected).getStateMachineLogEntry().getLogData().toStringUtf8()); + } catch (InvalidProtocolBufferException e) { + throw new CompletionException(e); + } + + Assert.assertEquals("log entry mismatch for query=" + query, expected, computed); + return null; + })); } JavaUtils.allOf(futures).join(); - } finally { - cluster.shutdown(); } } - static ByteString getMessageContent(RaftClientReply reply) { - Assert.assertTrue(reply.isSuccess()); - return reply.getMessage().getContent(); - } - @Test public void testRequestTimeout() throws Exception { final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties()); RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS)); - final CLUSTER cluster = newCluster(NUM_SERVERS); - cluster.start(); - RaftBasicTests.testRequestTimeout(true, cluster, LOG); - cluster.shutdown(); + runWithNewCluster(NUM_SERVERS, cluster -> RaftBasicTests.testRequestTimeout(true, cluster, LOG)); //reset for the other tests RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime); } @Test - public void testAppendEntriesTimeout() - throws IOException, InterruptedException, ExecutionException { + public void testAppendEntriesTimeout() throws Exception { + runWithNewCluster(NUM_SERVERS, this::runTestAppendEntriesTimeout); + } + + void runTestAppendEntriesTimeout(CLUSTER cluster) throws Exception { LOG.info("Running testAppendEntriesTimeout"); final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties()); RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(20, TimeUnit.SECONDS)); - final CLUSTER cluster = newCluster(NUM_SERVERS); - cluster.start(); waitForLeader(cluster); long time = System.currentTimeMillis(); long waitTime = 5000; @@ -309,7 +310,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba .map(SimpleStateMachine4Testing::get) .forEach(SimpleStateMachine4Testing::blockWriteStateMachineData); - CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new RaftTestUtil.SimpleMessage("abc")); + CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new SimpleMessage("abc")); Thread.sleep(waitTime); // replyFuture should not be completed until append request is unblocked. Assert.assertTrue(!replyFuture.isDone()); @@ -322,7 +323,6 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba replyFuture.get(); Assert.assertTrue(System.currentTimeMillis() - time > waitTime); } - cluster.shutdown(); //reset for the other tests RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index f37fc21..90cc627 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -35,7 +35,6 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; -import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; import org.junit.Assert; import org.junit.Test; @@ -54,10 +53,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.ratis.RaftTestUtil.logEntriesContains; -import static org.apache.ratis.RaftTestUtil.sendMessageInNewThread; import static org.apache.ratis.RaftTestUtil.waitForLeader; -import static org.junit.Assert.assertTrue; public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> extends BaseTest @@ -67,38 +63,39 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> LogUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(), Level.DEBUG); LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration - .valueOf(5, TimeUnit.SECONDS)); + RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS)); } public static final int NUM_SERVERS = 5; @Test public void testBasicAppendEntries() throws Exception { - try(CLUSTER cluster = newCluster(NUM_SERVERS)) { - cluster.start(); - runTestBasicAppendEntries(false, false, 10, cluster, LOG); - } + runWithNewCluster(NUM_SERVERS, cluster -> + runTestBasicAppendEntries(false, false, 10, cluster, LOG)); } @Test public void testBasicAppendEntriesKillLeader() throws Exception { - try(CLUSTER cluster = newCluster(NUM_SERVERS)) { - cluster.start(); - runTestBasicAppendEntries(false, true, 10, cluster, LOG); - } + runWithNewCluster(NUM_SERVERS, cluster -> + runTestBasicAppendEntries(false, true, 10, cluster, LOG)); } - static void killAndRestartServer(RaftPeerId id, long killSleepMs, long restartSleepMs, MiniRaftCluster cluster, Logger LOG) { - try { - Thread.sleep(killSleepMs); - cluster.killServer(id); - Thread.sleep(restartSleepMs); - LOG.info("restart server: " + id); - cluster.restartServer(id, false); - } catch (Exception e) { - ExitUtils.terminate(-1, "Failed to kill/restart server: " + id, e, LOG); - } + static CompletableFuture<Void> killAndRestartServer( + RaftPeerId id, long killSleepMs, long restartSleepMs, MiniRaftCluster cluster, Logger LOG) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + new Thread(() -> { + try { + Thread.sleep(killSleepMs); + cluster.killServer(id); + Thread.sleep(restartSleepMs); + LOG.info("restart server: " + id); + cluster.restartServer(id, false); + future.complete(null); + } catch (Exception e) { + ExitUtils.terminate(-1, "Failed to kill/restart server: " + id, e, LOG); + } + }).start(); + return future; } static void runTestBasicAppendEntries( @@ -112,10 +109,14 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> RaftServerImpl leader = waitForLeader(cluster); final long term = leader.getState().getCurrentTerm(); - new Thread(() -> killAndRestartServer(cluster.getFollowers().get(0).getId(), 0, 1000, cluster, LOG)).start(); + final CompletableFuture<Void> killAndRestartFollower = killAndRestartServer( + cluster.getFollowers().get(0).getId(), 0, 1000, cluster, LOG); + final CompletableFuture<Void> killAndRestartLeader; if (killLeader) { LOG.info("killAndRestart leader " + leader.getId()); - new Thread(() -> killAndRestartServer(leader.getId(), 2000, 4000, cluster, LOG)).start(); + killAndRestartLeader = killAndRestartServer(leader.getId(), 2000, 4000, cluster, LOG); + } else { + killAndRestartLeader = CompletableFuture.completedFuture(null); } LOG.info(cluster.printServers()); @@ -138,7 +139,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> }); } else { final RaftClientReply reply = client.send(message); - Preconditions.assertTrue(reply.isSuccess()); + Assert.assertTrue(reply.isSuccess()); } } if (async) { @@ -148,6 +149,8 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> } Thread.sleep(cluster.getTimeoutMax().toInt(TimeUnit.MILLISECONDS) + 100); LOG.info(cluster.printAllLogs()); + killAndRestartFollower.join(); + killAndRestartLeader.join(); for(RaftServerProxy server : cluster.getServers()) { final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId()); @@ -161,9 +164,10 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> @Test public void testOldLeaderCommit() throws Exception { - LOG.info("Running testOldLeaderCommit"); - final CLUSTER cluster = newCluster(NUM_SERVERS); - cluster.start(); + runWithNewCluster(NUM_SERVERS, this::runTestOldLeaderCommit); + } + + void runTestOldLeaderCommit(CLUSTER cluster) throws Exception { final RaftServerImpl leader = waitForLeader(cluster); final RaftPeerId leaderId = leader.getId(); final long term = leader.getState().getCurrentTerm(); @@ -180,7 +184,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> Thread.sleep(cluster.getMaxTimeout() + 100); RaftLog followerLog = followerToSendLog.getState().getLog(); - assertTrue(logEntriesContains(followerLog, messages)); + Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, messages)); LOG.info(String.format("killing old leader: %s", leaderId.toString())); cluster.killServer(leaderId); @@ -198,15 +202,14 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> cluster.getServerAliveStream().map(s -> s.getState().getLog()) .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages)); - LOG.info("terminating testOldLeaderCommit test"); - cluster.shutdown(); } @Test public void testOldLeaderNotCommit() throws Exception { - LOG.info("Running testOldLeaderNotCommit"); - final CLUSTER cluster = newCluster(NUM_SERVERS); - cluster.start(); + runWithNewCluster(NUM_SERVERS, this::runTestOldLeaderNotCommit); + } + + void runTestOldLeaderNotCommit(CLUSTER cluster) throws Exception { final RaftPeerId leaderId = waitForLeader(cluster).getId(); List<RaftServerImpl> followers = cluster.getFollowers(); @@ -217,10 +220,10 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> } SimpleMessage[] messages = SimpleMessage.create(1); - sendMessageInNewThread(cluster, messages); + RaftTestUtil.sendMessageInNewThread(cluster, messages); Thread.sleep(cluster.getMaxTimeout() + 100); - logEntriesContains(followerToCommit.getState().getLog(), messages); + RaftTestUtil.logEntriesContains(followerToCommit.getState().getLog(), messages); cluster.killServer(leaderId); cluster.killServer(followerToCommit.getId()); @@ -236,7 +239,6 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> cluster.getServerAliveStream() .map(s -> s.getState().getLog()) .forEach(log -> RaftTestUtil.checkLogEntries(log, messages, predicate)); - cluster.shutdown(); } static class Client4TestWithLoad extends Thread { @@ -318,13 +320,10 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> @Test public void testWithLoad() throws Exception { - try(CLUSTER cluster = newCluster(NUM_SERVERS)) { - cluster.start(); - testWithLoad(10, 500, false, cluster, LOG); - } + runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG)); } - public static void testWithLoad(final int numClients, final int numMessages, + static void testWithLoad(final int numClients, final int numMessages, boolean useAsync, MiniRaftCluster cluster, Logger LOG) throws Exception { LOG.info("Running testWithLoad: numClients=" + numClients + ", numMessages=" + numMessages + ", async=" + useAsync); @@ -371,12 +370,12 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> int count = 0; for(;; ) { - if (clients.stream().filter(Client4TestWithLoad::isRunning).count() == 0) { + if (clients.stream().noneMatch(Client4TestWithLoad::isRunning)) { break; } final int n = clients.stream().mapToInt(c -> c.step.get()).sum(); - assertTrue(n >= lastStep.get()); + Assert.assertTrue(n >= lastStep.get()); if (n - lastStep.get() < 50 * numClients) { // Change leader at least 50 steps. Thread.sleep(10); @@ -406,7 +405,6 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> } public static void testRequestTimeout(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception { - LOG.info("Running testRequestTimeout"); waitForLeader(cluster); long time = System.currentTimeMillis(); try (final RaftClient client = cluster.createClient()) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 19422bc..a96b917 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -156,11 +156,17 @@ public interface RaftTestUtil { } } - static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage... expectedMessages) { + static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage[] expectedMessages) { + for(SimpleMessage m : expectedMessages) { + assertLogEntries(cluster, m); + } + } + + static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage expectedMessage) { final int size = cluster.getServers().size(); final long count = cluster.getServerAliveStream() .map(s -> s.getState().getLog()) - .filter(log -> logEntriesContains(log, expectedMessages)) + .filter(log -> logEntriesContains(log, expectedMessage)) .count(); if (2*count <= size) { throw new AssertionError("Not in majority: size=" + size http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index d5fdf53..d4c4021 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -154,7 +154,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { Preconditions.assertNull(previous, "previous"); final String s = entry.getStateMachineLogEntry().getLogData().toStringUtf8(); dataMap.put(s, entry); - LOG.info("put {}, {} -> {}", entry.getIndex(), s, ServerProtoUtils.toLogEntryString(entry)); + LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(), s, ServerProtoUtils.toLogEntryString(entry)); } @Override @@ -290,7 +290,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { if (entry != null) { return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString())); } - exception = new IndexOutOfBoundsException("Log entry not found for query " + string); + exception = new IndexOutOfBoundsException(getId() + ": LogEntry not found for query " + string); } catch (Exception e) { LOG.warn("Failed request " + request, e); exception = e; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java index 614787e..a12c52f 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index d98be53..50b2d7f 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,11 +27,13 @@ import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.TimeDuration; import org.junit.Assert; import org.junit.Test; -import java.util.Arrays; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static org.apache.ratis.RaftTestUtil.waitForLeader; @@ -53,19 +55,16 @@ public class TestRaftWithGrpc @Test public void testRequestTimeout() throws Exception { - try(MiniRaftClusterWithGrpc cluster = newCluster(NUM_SERVERS)) { - cluster.start(); - testRequestTimeout(false, cluster, LOG); - } + runWithNewCluster(NUM_SERVERS, cluster -> testRequestTimeout(false, cluster, LOG)); } @Test public void testUpdateViaHeartbeat() throws Exception { - LOG.info("Running testUpdateViaHeartbeat"); - final MiniRaftClusterWithGrpc cluster = newCluster(NUM_SERVERS); - cluster.start(); + runWithNewCluster(NUM_SERVERS, this::runTestUpdateViaHeartbeat); + } + + void runTestUpdateViaHeartbeat(MiniRaftClusterWithGrpc cluster) throws Exception { waitForLeader(cluster); - long waitTime = 5000; try (final RaftClient client = cluster.createClient()) { // block append requests cluster.getServerAliveStream() @@ -75,7 +74,7 @@ public class TestRaftWithGrpc CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new RaftTestUtil.SimpleMessage("abc")); - Thread.sleep(waitTime); + TimeDuration.valueOf(5 , TimeUnit.SECONDS).sleep(); // replyFuture should not be completed until append request is unblocked. Assert.assertTrue(!replyFuture.isDone()); // unblock append request. @@ -84,27 +83,28 @@ public class TestRaftWithGrpc .map(SimpleStateMachine4Testing::get) .forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData); - long index = cluster.getLeader().getState().getLog().getNextIndex(); + final long index = cluster.getLeader().getState().getLog().getNextIndex(); TermIndex[] leaderEntries = cluster.getLeader().getState().getLog().getEntries(0, Integer.MAX_VALUE); // The entries have been appended in the followers // although the append entry timed out at the leader - cluster.getServerAliveStream().forEach(raftServer -> { + + final TimeDuration sleepTime = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + cluster.getServerAliveStream().filter(impl -> !impl.isLeader()).forEach(raftServer -> + JavaUtils.runAsUnchecked(() -> JavaUtils.attempt(() -> { Assert.assertEquals(raftServer.getState().getLog().getNextIndex(), index); - if (!raftServer.isLeader()) { - TermIndex[] serverEntries = raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE); - Assert.assertArrayEquals(serverEntries, leaderEntries); - } - }); + TermIndex[] serverEntries = raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE); + Assert.assertArrayEquals(serverEntries, leaderEntries); + }, 10, sleepTime, "assertRaftLog-" + raftServer.getId(), LOG))); // Wait for heartbeats from leader to be received by followers Thread.sleep(500); - RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender -> { + RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender -> + JavaUtils.runAsUnchecked(() -> JavaUtils.attempt(() -> { // FollowerInfo in the leader state should have updated next and match index. final long followerMatchIndex = logAppender.getFollower().getMatchIndex(); Assert.assertTrue(followerMatchIndex >= index - 1); Assert.assertEquals(followerMatchIndex + 1, logAppender.getFollower().getNextIndex()); - }); + }, 10, sleepTime, "assertRaftLog-" + logAppender.getFollower(), LOG))); } - cluster.shutdown(); } }
