Repository: incubator-ratis Updated Branches: refs/heads/master 27c2dfe6e -> 8055e5d6e
RATIS-381. RaftTestUtil.waitForLeader should not return null. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8055e5d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8055e5d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8055e5d6 Branch: refs/heads/master Commit: 8055e5d6e4b885b05aee302cd5f99fa1622e63c3 Parents: 27c2dfe Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Sat Nov 3 20:29:36 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Sat Nov 3 20:29:36 2018 +0800 ---------------------------------------------------------------------- .../org/apache/ratis/util/CheckedRunnable.java | 10 +- .../org/apache/ratis/util/CheckedSupplier.java | 10 +- .../java/org/apache/ratis/util/JavaUtils.java | 98 +++++++++++--------- .../org/apache/ratis/util/TimeDuration.java | 20 +++- .../org/apache/ratis/grpc/TestRaftStream.java | 2 +- .../java/org/apache/ratis/MiniRaftCluster.java | 75 ++++++++++++--- .../java/org/apache/ratis/RaftTestUtil.java | 82 ++++++++-------- .../java/org/apache/ratis/RetryCacheTests.java | 11 ++- .../apache/ratis/server/ServerRestartTests.java | 6 ++ .../server/impl/GroupManagementBaseTest.java | 9 +- .../ratis/server/impl/LeaderElectionTests.java | 46 ++++++--- .../impl/RaftReconfigurationBaseTest.java | 2 +- .../ratis/server/impl/RaftServerTestUtil.java | 9 +- .../server/impl/ServerInformationBaseTest.java | 4 +- 14 files changed, 243 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java index b6e90b9..2911254 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -25,4 +25,12 @@ public interface CheckedRunnable<THROWABLE extends Throwable> { * except that this method is declared with a throws-clause. */ void run() throws THROWABLE; + + static <THROWABLE extends Throwable> CheckedSupplier<?, THROWABLE> asCheckedSupplier( + CheckedRunnable<THROWABLE> runnable) { + return () -> { + runnable.run(); + return null; + }; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java index 06abe4c..9bbb009 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,12 +27,4 @@ public interface CheckedSupplier<OUTPUT, THROWABLE extends Throwable> { * except that this method is declared with a throws-clause. */ OUTPUT get() throws THROWABLE; - - static <THROWABLE extends Throwable> CheckedSupplier<?, THROWABLE> valueOf( - CheckedRunnable<THROWABLE> runnable) { - return () -> { - runnable.run(); - return null; - }; - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java index 923e03d..b855b2a 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java @@ -1,23 +1,20 @@ /* - * * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - package org.apache.ratis.util; import org.slf4j.Logger; @@ -49,6 +46,7 @@ public interface JavaUtils { Logger LOG = LoggerFactory.getLogger(JavaUtils.class); DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS"); + CompletableFuture[] EMPTY_COMPLETABLE_FUTURE_ARRAY = {}; static String date() { return DATE_FORMAT.format(new Date()); @@ -92,20 +90,6 @@ public interface JavaUtils { } /** - * Get the value from the future and then consume it. - */ - static <T> void getAndConsume(CompletableFuture<T> future, Consumer<T> consumer) { - final T t; - try { - t = future.get(); - } catch (Exception ignored) { - LOG.warn("Failed to get()", ignored); - return; - } - consumer.accept(t); - } - - /** * Create a memoized supplier which gets a value by invoking the initializer once * and then keeps returning the same value as its supplied results. * @@ -129,14 +113,23 @@ public interface JavaUtils { return ROOT_THREAD_GROUP.get(); } - /** Attempt to get a return value from the given supplier multiple times. */ + /** @deprecated use {@link #attempt(CheckedSupplier, int, TimeDuration, String, Logger)} */ + @Deprecated static <RETURN, THROWABLE extends Throwable> RETURN attempt( CheckedSupplier<RETURN, THROWABLE> supplier, int numAttempts, long sleepMs, String name, Logger log) throws THROWABLE, InterruptedException { + return attempt(supplier, numAttempts, TimeDuration.valueOf(sleepMs, TimeUnit.MILLISECONDS), name, log); + } + + /** Attempt to get a return value from the given supplier multiple times. */ + static <RETURN, THROWABLE extends Throwable> RETURN attempt( + CheckedSupplier<RETURN, THROWABLE> supplier, + int numAttempts, TimeDuration sleepTime, String name, Logger log) + throws THROWABLE, InterruptedException { Objects.requireNonNull(supplier, "supplier == null"); Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " + numAttempts + " <= 0"); - Preconditions.assertTrue(sleepMs >= 0L, () -> "sleepMs = " + sleepMs + " < 0"); + Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " + sleepTime + " < 0"); for(int i = 1; i <= numAttempts; i++) { try { @@ -147,31 +140,45 @@ public interface JavaUtils { } if (log != null && log.isWarnEnabled()) { log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts - + ": " + t + ", sleep " + sleepMs + "ms and then retry.", t); + + ": " + t + ", sleep " + sleepTime + " and then retry.", t); } } - if (sleepMs > 0) { - Thread.sleep(sleepMs); - } + sleepTime.sleep(); } throw new IllegalStateException("BUG: this line should be unreachable."); } - /** Attempt to run the given op multiple times. */ + /** @deprecated use {@link #attempt(CheckedRunnable, int, TimeDuration, String, Logger)} */ + @Deprecated static <THROWABLE extends Throwable> void attempt( CheckedRunnable<THROWABLE> op, int numAttempts, long sleepMs, String name, Logger log) throws THROWABLE, InterruptedException { - attempt(CheckedSupplier.valueOf(op), numAttempts, sleepMs, name, log); + attempt(op, numAttempts, TimeDuration.valueOf(sleepMs, TimeUnit.MILLISECONDS), name, log); } - /** Attempt to wait the given condition to return true multiple times. */ + /** Attempt to run the given op multiple times. */ + static <THROWABLE extends Throwable> void attempt( + CheckedRunnable<THROWABLE> runnable, int numAttempts, TimeDuration sleepTime, String name, Logger log) + throws THROWABLE, InterruptedException { + attempt(CheckedRunnable.asCheckedSupplier(runnable), numAttempts, sleepTime, name, log); + } + + /** @deprecated use {@link #attempt(BooleanSupplier, int, TimeDuration, String, Logger)} */ + @Deprecated static void attempt( BooleanSupplier condition, int numAttempts, long sleepMs, String name, Logger log) throws InterruptedException { + attempt(condition, numAttempts, TimeDuration.valueOf(sleepMs, TimeUnit.MILLISECONDS), name, log); + } + + /** Attempt to wait the given condition to return true multiple times. */ + static void attempt( + BooleanSupplier condition, int numAttempts, TimeDuration sleepTime, String name, Logger log) + throws InterruptedException { Objects.requireNonNull(condition, "condition == null"); Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " + numAttempts + " <= 0"); - Preconditions.assertTrue(sleepMs >= 0L, () -> "sleepMs = " + sleepMs + " < 0"); + Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " + sleepTime + " < 0"); for(int i = 1; i <= numAttempts; i++) { if (condition.getAsBoolean()) { @@ -179,11 +186,10 @@ public interface JavaUtils { } if (log != null && log.isWarnEnabled()) { log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts - + ": sleep " + sleepMs + "ms and then retry."); - } - if (sleepMs > 0) { - Thread.sleep(sleepMs); + + ": sleep " + sleepTime + " and then retry."); } + + sleepTime.sleep(); } if (!condition.getAsBoolean()) { @@ -222,7 +228,7 @@ public interface JavaUtils { } static <T> CompletableFuture<Void> allOf(List<CompletableFuture<T>> futures) { - return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])); + return CompletableFuture.allOf(futures.toArray(EMPTY_COMPLETABLE_FUTURE_ARRAY)); } static <OUTPUT, THROWABLE extends Throwable> OUTPUT supplyAndWrapAsCompletionException( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java index 8a7c44a..2fad806 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,11 +17,18 @@ */ package org.apache.ratis.util; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.function.LongUnaryOperator; /** * Time duration is represented together with a {@link TimeUnit}. + * + * This class is immutable. */ public class TimeDuration implements Comparable<TimeDuration> { @@ -119,6 +126,15 @@ public class TimeDuration implements Comparable<TimeDuration> { return Math.toIntExact(toLong(targetUnit)); } + /** + * Apply the given operator to the duration value of this object. + * + * @return a new object with the new duration value and the same unit of this object. + */ + public TimeDuration apply(LongUnaryOperator operator) { + return valueOf(operator.applyAsLong(duration), unit); + } + public boolean isNegative() { return duration < 0; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java index b3f6a41..ba31b2b 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java @@ -291,7 +291,7 @@ public class TestRaftStream extends BaseTest { // force change the leader Thread.sleep(500); - RaftTestUtil.waitAndKillLeader(cluster, true); + RaftTestUtil.waitAndKillLeader(cluster); final RaftServerImpl newLeader = waitForLeader(cluster); Assert.assertNotEquals(leader.getId(), newLeader.getId()); Thread.sleep(500); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 3dd0612..0e352f4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -143,6 +144,10 @@ public abstract class MiniRaftCluster implements Closeable { return ids; } + public static int getIdIndex(String id) { + return Integer.parseInt(id.substring(1)); + } + protected RaftGroup group; protected final RaftProperties properties; protected final Parameters parameters; @@ -226,10 +231,16 @@ public abstract class MiniRaftCluster implements Closeable { start(); } + /** @deprecated use {@link #getTimeoutMax()}. */ + @Deprecated public int getMaxTimeout() { return RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS); } + public TimeDuration getTimeoutMax() { + return RaftServerConfigKeys.Rpc.timeoutMax(properties); + } + private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group, boolean format) { LOG.info("newRaftServer: {}, {}, format? {}", id, group, format); try { @@ -410,15 +421,60 @@ public abstract class MiniRaftCluster implements Closeable { return leader; } + IllegalStateException newIllegalStateExceptionForNoLeaders(RaftGroupId groupId) { + final String g = groupId == null? "": " for " + groupId; + return new IllegalStateException("No leader yet " + g + ": " + printServers(groupId)); + } + + IllegalStateException newIllegalStateExceptionForMultipleLeaders(RaftGroupId groupId, List<RaftServerImpl> leaders) { + final String g = groupId == null? "": " for " + groupId; + return new IllegalStateException("Found multiple leaders" + g + + " at the same term (=" + leaders.get(0).getState().getCurrentTerm() + + "), leaders.size() = " + leaders.size() + " > 1, leaders = " + leaders + + ": " + printServers(groupId)); + } + + /** + * Get leader for the single group case. + * Do not use this method if this cluster has multiple groups. + * + * @return the unique leader with the highest term. Or, return null if there is no leader. + * @throws IllegalStateException if there are multiple leaders with the same highest term. + */ public RaftServerImpl getLeader() { - return getLeader((RaftGroupId)null); + return getLeader(getLeaders(null), null, leaders -> { + throw newIllegalStateExceptionForMultipleLeaders(null, leaders); + }); } - public RaftServerImpl getLeader(RaftGroupId groupId) { - return getLeader(getServerAliveStream(groupId)); + RaftServerImpl getLeader(RaftGroupId groupId, Runnable handleNoLeaders, + Consumer<List<RaftServerImpl>> handleMultipleLeaders) { + return getLeader(getLeaders(groupId), handleNoLeaders, handleMultipleLeaders); } - static RaftServerImpl getLeader(Stream<RaftServerImpl> serverAliveStream) { + static RaftServerImpl getLeader(List<RaftServerImpl> leaders, Runnable handleNoLeaders, + Consumer<List<RaftServerImpl>> handleMultipleLeaders) { + if (leaders.isEmpty()) { + if (handleNoLeaders != null) { + handleNoLeaders.run(); + } + return null; + } else if (leaders.size() > 1) { + if (handleMultipleLeaders != null) { + handleMultipleLeaders.accept(leaders); + } + return null; + } else { + return leaders.get(0); + } + } + + /** + * @return the list of leaders with the highest term (i.e. leaders with a lower term are not included). + * from the given group. + */ + private List<RaftServerImpl> getLeaders(RaftGroupId groupId) { + final Stream<RaftServerImpl> serverAliveStream = getServerAliveStream(groupId); final List<RaftServerImpl> leaders = new ArrayList<>(); serverAliveStream.filter(RaftServerImpl::isLeader).forEach(s -> { if (leaders.isEmpty()) { @@ -434,13 +490,7 @@ public abstract class MiniRaftCluster implements Closeable { } } }); - if (leaders.isEmpty()) { - return null; - } else if (leaders.size() > 1) { - throw new IllegalStateException(leaders - + ", leaders.size() = " + leaders.size() + " > 1"); - } - return leaders.get(0); + return leaders; } boolean isLeader(String leaderId) { @@ -469,7 +519,8 @@ public abstract class MiniRaftCluster implements Closeable { private Stream<RaftServerImpl> getServerStream(RaftGroupId groupId) { final Stream<RaftServerProxy> stream = getRaftServerProxyStream(groupId); - return groupId != null? stream.map(s -> RaftServerTestUtil.getRaftServerImpl(s, groupId)) + return groupId != null? + stream.filter(s -> s.containsGroup(groupId)).map(s -> RaftServerTestUtil.getRaftServerImpl(s, groupId)) : stream.flatMap(s -> RaftServerTestUtil.getRaftServerImpls(s).stream()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 78e3768..e4d9c8c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -36,6 +36,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; +import org.apache.ratis.util.TimeDuration; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +47,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BooleanSupplier; +import java.util.function.Consumer; import java.util.function.IntSupplier; import java.util.function.Predicate; @@ -57,60 +61,54 @@ public interface RaftTestUtil { static RaftServerImpl waitForLeader(MiniRaftCluster cluster) throws InterruptedException { - return waitForLeader(cluster, false); + return waitForLeader(cluster, null); } - static RaftServerImpl waitForLeader( - MiniRaftCluster cluster, boolean tolerateMultipleLeaders) throws InterruptedException { - return waitForLeader(cluster, tolerateMultipleLeaders, null); + static RaftServerImpl waitForLeader(MiniRaftCluster cluster, RaftGroupId groupId) + throws InterruptedException { + return waitForLeader(cluster, groupId, true); } static RaftServerImpl waitForLeader( - MiniRaftCluster cluster, boolean tolerateMultipleLeaders, RaftGroupId groupId) + MiniRaftCluster cluster, RaftGroupId groupId, boolean expectLeader) throws InterruptedException { - final long sleepTime = (cluster.getMaxTimeout() * 3) >> 1; - LOG.info(cluster.printServers(groupId)); - RaftServerImpl leader = null; - for(int i = 0; leader == null && i < 10; i++) { - Thread.sleep(sleepTime); - try { - leader = cluster.getLeader(groupId); - } catch(IllegalStateException e) { - if (!tolerateMultipleLeaders) { - throw e; - } - } - } + final String name = "waitForLeader-" + groupId + "-(expectLeader? " + expectLeader + ")"; + final int numAttempts = expectLeader? 100: 10; + final TimeDuration sleepTime = cluster.getTimeoutMax().apply(d -> (d * 3) >> 1); LOG.info(cluster.printServers(groupId)); - return leader; - } - static RaftServerImpl waitForLeader( - MiniRaftCluster cluster, final String leaderId) throws InterruptedException { - LOG.info(cluster.printServers()); - for(int i = 0; !cluster.tryEnforceLeader(leaderId) && i < 10; i++) { - RaftServerImpl currLeader = cluster.getLeader(); - LOG.info("try enforcing leader to " + leaderId + " but " + - (currLeader == null ? "no leader for this round" : "new leader is " + currLeader.getId())); - } - LOG.info(cluster.printServers()); + final AtomicReference<IllegalStateException> exception = new AtomicReference<>(); + final Runnable handleNoLeaders = () -> { + throw cluster.newIllegalStateExceptionForNoLeaders(groupId); + }; + final Consumer<List<RaftServerImpl>> handleMultipleLeaders = leaders -> { + final IllegalStateException ise = cluster.newIllegalStateExceptionForMultipleLeaders(groupId, leaders); + exception.set(ise); + }; - final RaftServerImpl leader = cluster.getLeader(); - Assert.assertEquals(leaderId, leader.getId().toString()); - return leader; - } + final RaftServerImpl leader = JavaUtils.attempt( + () -> cluster.getLeader(groupId, handleNoLeaders, handleMultipleLeaders), + numAttempts, sleepTime, name, LOG); - static String waitAndKillLeader(MiniRaftCluster cluster, - boolean expectLeader) throws InterruptedException { - final RaftServerImpl leader = waitForLeader(cluster); - if (!expectLeader) { - Assert.assertNull(leader); + LOG.info(cluster.printServers(groupId)); + if (expectLeader) { + return Optional.ofNullable(leader).orElseThrow(exception::get); } else { - Assert.assertNotNull(leader); - LOG.info("killing leader = " + leader); - cluster.killServer(leader.getId()); + if (leader == null) { + return null; + } else { + throw new IllegalStateException("expectLeader = " + expectLeader + " but leader = " + leader); + } } - return leader != null ? leader.getId().toString() : null; + } + + static String waitAndKillLeader(MiniRaftCluster cluster) throws InterruptedException { + final RaftServerImpl leader = waitForLeader(cluster); + Assert.assertNotNull(leader); + + LOG.info("killing leader = " + leader); + cluster.killServer(leader.getId()); + return leader.getId().toString(); } static boolean logEntriesContains(RaftLog log, SimpleMessage... expectedMessages) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java index c962481..a77e6e4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java @@ -31,6 +31,8 @@ import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.server.storage.RaftLogIOException; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.TimeDuration; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -38,7 +40,7 @@ import org.junit.Test; import java.io.IOException; import java.util.Arrays; -import java.util.stream.LongStream; +import java.util.concurrent.TimeUnit; import static java.util.Arrays.asList; @@ -154,8 +156,11 @@ public abstract class RetryCacheTests extends BaseTest { // trigger setConfiguration cluster.setConfiguration(allPeers); - RaftTestUtil.waitForLeader(cluster); - final RaftPeerId newLeaderId = cluster.getLeader().getId(); + final RaftPeerId newLeaderId = JavaUtils.attempt(() -> { + final RaftPeerId id = RaftTestUtil.waitForLeader(cluster).getId(); + Assert.assertNotEquals(leaderId, id); + return id; + }, 10, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS), "wait for a leader different than " + leaderId, LOG); Assert.assertNotEquals(leaderId, newLeaderId); // same clientId and callId in the request r = cluster.newRaftClientRequest(client.getId(), newLeaderId, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java index 531d2e2..c4cfe48 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java @@ -42,6 +42,7 @@ import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.StringUtils; +import org.apache.ratis.util.TimeDuration; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -50,6 +51,7 @@ import java.io.File; import java.io.RandomAccessFile; import java.nio.file.Path; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -180,6 +182,10 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> static void runTestRestartWithCorruptedLogHeader(MiniRaftCluster cluster, Logger LOG) throws Exception { cluster.start(); RaftTestUtil.waitForLeader(cluster); + for(RaftServerImpl impl : cluster.iterateServerImpls()) { + JavaUtils.attempt(() -> getOpenLogFile(impl), 10, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS), + impl.getId() + ": wait for log file creation", LOG); + } // shutdown all servers cluster.getServers().forEach(RaftServerProxy::close); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java index 66e3c61..bc3b764 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java @@ -28,7 +28,6 @@ import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.storage.RaftStorageDirectory; import org.apache.ratis.util.CheckedBiConsumer; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; @@ -89,7 +88,7 @@ public abstract class GroupManagementBaseTest extends BaseTest { for(RaftPeer p : newGroup.getPeers()) { client.groupAdd(newGroup, p.getId()); } - Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster)); TimeUnit.SECONDS.sleep(1); // restart the servers with null group @@ -99,7 +98,7 @@ public abstract class GroupManagementBaseTest extends BaseTest { } // the servers should retrieve the conf from the log. - Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster)); cluster.shutdown(); } @@ -173,7 +172,7 @@ public abstract class GroupManagementBaseTest extends BaseTest { client.groupAdd(groups[i], p.getId()); } } - Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid)); + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, gid)); checker.accept(cluster, groups[i]); } printThreadCount(type, "start groups"); @@ -220,7 +219,7 @@ public abstract class GroupManagementBaseTest extends BaseTest { client.setConfiguration(allPeers.toArray(RaftPeer.emptyArray())); } - Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster)); checker.accept(cluster, groups[chosen]); LOG.info("update groups: " + cluster.printServers()); printThreadCount(type, "update groups"); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 54fbf5f..a48edc4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -30,12 +30,12 @@ import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.TimeDuration; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; import java.util.Iterator; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import static org.apache.ratis.RaftTestUtil.waitAndKillLeader; import static org.apache.ratis.RaftTestUtil.waitForLeader; public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> @@ -51,10 +51,12 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> LOG.info("Running testBasicLeaderElection"); final MiniRaftCluster cluster = newCluster(5); cluster.start(); - waitAndKillLeader(cluster, true); - waitAndKillLeader(cluster, true); - waitAndKillLeader(cluster, true); - waitAndKillLeader(cluster, false); + RaftTestUtil.waitAndKillLeader(cluster); + RaftTestUtil.waitAndKillLeader(cluster); + RaftTestUtil.waitAndKillLeader(cluster); + testFailureCase("waitForLeader after killed a majority of servers", + () -> RaftTestUtil.waitForLeader(cluster, null, false), + IllegalStateException.class); cluster.shutdown(); } @@ -76,15 +78,33 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> @Test public void testEnforceLeader() throws Exception { - final int numServer = 3; LOG.info("Running testEnforceLeader"); - final String leader = "s" + ThreadLocalRandom.current().nextInt(numServer); - LOG.info("enforce leader to " + leader); - final MiniRaftCluster cluster = newCluster(numServer); - cluster.start(); - waitForLeader(cluster); - waitForLeader(cluster, leader); - cluster.shutdown(); + final int numServer = 5; + try(final MiniRaftCluster cluster = newCluster(numServer)) { + cluster.start(); + + final RaftPeerId firstLeader = waitForLeader(cluster).getId(); + LOG.info("firstLeader = {}", firstLeader); + final int first = MiniRaftCluster.getIdIndex(firstLeader.toString()); + + final int random = ThreadLocalRandom.current().nextInt(numServer - 1); + final String newLeader = "s" + (random < first? random: random + 1); + LOG.info("enforce leader to {}", newLeader); + enforceLeader(cluster, newLeader, LOG); + } + } + + static void enforceLeader(MiniRaftCluster cluster, final String newLeader, Logger LOG) throws InterruptedException { + LOG.info(cluster.printServers()); + for(int i = 0; !cluster.tryEnforceLeader(newLeader) && i < 10; i++) { + RaftServerImpl currLeader = cluster.getLeader(); + LOG.info("try enforcing leader to " + newLeader + " but " + + (currLeader == null ? "no leader for round " + i : "new leader is " + currLeader.getId())); + } + LOG.info(cluster.printServers()); + + final RaftServerImpl leader = cluster.getLeader(); + Assert.assertEquals(newLeader, leader.getId().toString()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 246a9a2..7fde1c5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -375,7 +375,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { committedIndex <= 1); LOG.info("kill the current leader"); - final String oldLeaderId = RaftTestUtil.waitAndKillLeader(cluster, true); + final String oldLeaderId = RaftTestUtil.waitAndKillLeader(cluster); LOG.info("start new peers: {}", Arrays.asList(c1.newPeers)); for (RaftPeer np : c1.newPeers) { cluster.restartServer(np.getId(), false); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index 827117e..ee9008a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -22,10 +22,12 @@ import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.TimeDuration; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.stream.Stream; @@ -36,13 +38,13 @@ public class RaftServerTestUtil { public static void waitAndCheckNewConf(MiniRaftCluster cluster, RaftPeer[] peers, int numOfRemovedPeers, Collection<String> deadPeers) throws Exception { - final long sleepMs = cluster.getMaxTimeout() * (numOfRemovedPeers + 2); + final TimeDuration sleepTime = cluster.getTimeoutMax().apply(n -> n * (numOfRemovedPeers + 2)); JavaUtils.attempt(() -> waitAndCheckNewConf(cluster, peers, deadPeers), - 3, sleepMs, "waitAndCheckNewConf", LOG); + 10, sleepTime, "waitAndCheckNewConf", LOG); } private static void waitAndCheckNewConf(MiniRaftCluster cluster, RaftPeer[] peers, Collection<String> deadPeers) { - LOG.info(cluster.printServers()); + LOG.info("waitAndCheckNewConf: peers={}, deadPeers={}, {}", Arrays.asList(peers), deadPeers, cluster.printServers()); Assert.assertNotNull(cluster.getLeader()); int numIncluded = 0; @@ -50,6 +52,7 @@ public class RaftServerTestUtil { final RaftConfiguration current = RaftConfiguration.newBuilder() .setConf(peers).setLogEntryIndex(0).build(); for (RaftServerImpl server : cluster.iterateServerImpls()) { + LOG.info("checking {}", server); if (deadPeers != null && deadPeers.contains(server.getId().toString())) { if (current.containsInConf(server.getId())) { deadIncluded++; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java index 30aae33..77a4209 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java @@ -20,7 +20,6 @@ package org.apache.ratis.server.impl; import org.apache.log4j.Level; import org.apache.ratis.BaseTest; import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.*; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; @@ -141,9 +140,8 @@ public abstract class ServerInformationBaseTest<CLUSTER extends MiniRaftCluster> RaftClientReply sendMessages(int n, MiniRaftCluster cluster) throws Exception { LOG.info("sendMessages: " + n); - final RaftPeerId leader = RaftTestUtil.waitForLeader(cluster, true, cluster.getGroupId()).getId(); RaftClientReply reply = null; - try(final RaftClient client = cluster.createClient(leader)) { + try(final RaftClient client = cluster.createClient()) { for(int i = 0; i < n; i++) { reply = client.send(Message.valueOf("m" + i)); }
