Repository: incubator-ratis Updated Branches: refs/heads/master c6c9ddf4d -> acd507e6e
RATIS-300. Support multiple RaftServerImpl in RaftServerProxy. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/acd507e6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/acd507e6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/acd507e6 Branch: refs/heads/master Commit: acd507e6e2a5520682aa6ba37c5042a377f01cd0 Parents: c6c9ddf Author: Mukul Kumar Singh <[email protected]> Authored: Mon Sep 3 14:30:58 2018 +0530 Committer: Mukul Kumar Singh <[email protected]> Committed: Mon Sep 3 14:30:58 2018 +0530 ---------------------------------------------------------------------- .../ratis/protocol/AlreadyClosedException.java | 27 ++ .../java/org/apache/ratis/util/IOUtils.java | 18 +- .../java/org/apache/ratis/util/JavaUtils.java | 17 +- .../java/org/apache/ratis/util/LifeCycle.java | 3 +- .../org/apache/ratis/util/Preconditions.java | 8 + .../org/apache/ratis/TestRestartRaftPeer.java | 4 +- .../ratis/examples/ParameterizedBaseTest.java | 2 +- .../org/apache/ratis/grpc/RaftGRpcService.java | 14 +- .../ratis/grpc/server/GRpcLogAppender.java | 2 +- .../ratis/grpc/TestRaftServerWithGrpc.java | 4 +- .../hadooprpc/server/HadoopRpcService.java | 2 +- .../ratis/netty/server/NettyRpcService.java | 2 +- .../org/apache/ratis/server/RaftServer.java | 5 +- .../org/apache/ratis/server/RaftServerRpc.java | 3 +- .../ratis/server/impl/RaftServerImpl.java | 20 +- .../ratis/server/impl/RaftServerProxy.java | 301 +++++++++++++------ .../server/impl/RaftServerRpcWithProxy.java | 11 +- .../org/apache/ratis/server/impl/RoleInfo.java | 5 + .../ratis/server/impl/ServerImplUtils.java | 23 +- .../apache/ratis/server/storage/RaftLog.java | 2 +- .../java/org/apache/ratis/MiniRaftCluster.java | 79 ++--- .../java/org/apache/ratis/RaftAsyncTests.java | 18 +- .../java/org/apache/ratis/RaftBasicTests.java | 4 +- .../java/org/apache/ratis/RaftTestUtil.java | 13 +- .../TestRaftServerLeaderElectionTimeout.java | 3 +- .../ratis/TestRaftServerSlownessDetection.java | 2 +- .../ratis/server/impl/LeaderElectionTests.java | 5 +- .../impl/RaftReconfigurationBaseTest.java | 3 +- .../ratis/server/impl/RaftServerTestUtil.java | 10 + .../server/impl/ReinitializationBaseTest.java | 11 +- .../server/simulation/SimulatedServerRpc.java | 10 +- .../statemachine/RaftSnapshotBaseTest.java | 3 +- .../ratis/statemachine/TestStateMachine.java | 28 +- 33 files changed, 426 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java new file mode 100644 index 0000000..85888a0 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +/** + * The corresponding object is already closed. + */ +public class AlreadyClosedException extends RaftException { + public AlreadyClosedException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java index 915f4a2..c560bc6 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java @@ -20,6 +20,8 @@ package org.apache.ratis.util; +import org.slf4j.Logger; + import java.io.Closeable; import java.io.EOFException; import java.io.IOException; @@ -30,10 +32,10 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; -import org.slf4j.Logger; - /** * IO related utility methods. */ @@ -56,6 +58,18 @@ public interface IOUtils { return cause != null? asIOException(cause): new IOException(e); } + static <T> T getFromFuture(CompletableFuture<T> future, Object name) throws IOException { + try { + return future.get(); + } catch (InterruptedException e) { + throw toInterruptedIOException(name + " interrupted.", e); + } catch (ExecutionException e) { + throw toIOException(e); + } catch (CompletionException e) { + throw asIOException(JavaUtils.unwrapCompletionException(e)); + } + } + static boolean shouldReconnect(Exception e) { return ReflectionUtils.isInstance(e, SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java index 15cb4f6..dd8eb39 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java @@ -39,6 +39,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; /** @@ -67,12 +68,20 @@ public interface JavaUtils { * wrap the checked exception by {@link RuntimeException}. */ static <T> T callAsUnchecked(Callable<T> callable) { + return callAsUnchecked(callable::call, RuntimeException::new); + } + + static <OUTPUT, THROWABLE extends Throwable> OUTPUT callAsUnchecked( + CheckedSupplier<OUTPUT, THROWABLE> checkedSupplier, + Function<THROWABLE, ? extends RuntimeException> converter) { try { - return callable.call(); - } catch (RuntimeException e) { + return checkedSupplier.get(); + } catch(RuntimeException | Error e) { throw e; - } catch (Exception e) { - throw new RuntimeException(e); + } catch(Throwable t) { + @SuppressWarnings("unchecked") + final THROWABLE casted = (THROWABLE)t; + throw converter.apply(casted); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java index 93af1ef..f8f3648 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java @@ -158,12 +158,13 @@ public class LifeCycle { } /** Assert if the current state equals to one of the expected states. */ - public <T extends Throwable> void assertCurrentState( + public <T extends Throwable> State assertCurrentState( BiFunction<String, State, T> newThrowable, State... expected) throws T { final State c = getCurrentState(); if (!c.isOneOf(expected)) { throw newThrowable.apply(name, c); } + return c; } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java index f1d55b0..7af2201 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java @@ -72,4 +72,12 @@ public interface Preconditions { throw new IllegalStateException(String.valueOf(message.get())); } } + + static void assertNull(Object object, String name) { + if (object != null) { + throw new IllegalStateException( + name + " is expected to be null but " + + name + " = " + object + " != null, class = " + object.getClass()); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java index 4b7ed40..ccbbda0 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java @@ -93,14 +93,14 @@ public class TestRestartRaftPeer extends BaseTest { long lastAppliedIndex = 0; for (int i = 0; i < 10 && !catchup; i++) { Thread.sleep(500); - lastAppliedIndex = cluster.getServer(followerId).getImpl().getState().getLastAppliedIndex(); + lastAppliedIndex = cluster.getRaftServerImpl(followerId).getState().getLastAppliedIndex(); catchup = lastAppliedIndex >= 20; } Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup); // make sure the restarted peer's log segments is correct cluster.restartServer(followerId, false); - Assert.assertTrue(cluster.getServer(followerId).getImpl().getState().getLog() + Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog() .getLastEntryTermIndex().getIndex() >= 20); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java index 3ee5e05..057c73a 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java @@ -51,7 +51,7 @@ public abstract class ParameterizedBaseTest extends BaseTest { private static final AtomicReference<MiniRaftCluster> currentCluster = new AtomicReference<>(); /** Set {@link #currentCluster} to the given cluster and start it if {@link #currentCluster} is changed. */ - public static void setAndStart(MiniRaftCluster cluster) throws InterruptedException { + public static void setAndStart(MiniRaftCluster cluster) throws InterruptedException, IOException { final MiniRaftCluster previous = currentCluster.getAndSet(cluster); if (previous != cluster) { if (previous != null) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index fefec48..b3e514c 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -111,11 +111,17 @@ public class RaftGRpcService extends RaftServerRpcWithProxy<RaftServerProtocolCl } @Override - public void closeImpl() { - if (server != null) { - server.shutdown(); - } + public void closeImpl() throws IOException { + final String name = getId() + ": shutdown server with port " + server.getPort(); + LOG.info("{} now", name); + final Server s = server.shutdownNow(); super.closeImpl(); + try { + s.awaitTermination(); + } catch(InterruptedException e) { + throw IOUtils.toInterruptedIOException(name + " failed", e); + } + LOG.info("{} successfully", name); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java index 595e061..f060c24 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java @@ -303,7 +303,7 @@ public class GRpcLogAppender extends LogAppender { AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId()); if (request == null) { // If reply comes after timeout, the reply is ignored. - LOG.warn("Ignoring reply: " + reply); + LOG.warn("{}: Ignoring {}", server.getId(), reply); return; } Preconditions.assertTrue(request.hasPreviousLog()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java index e5e95ee..7173e1f 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java @@ -51,8 +51,8 @@ public class TestRaftServerWithGrpc extends BaseTest { // the raft server proxy created earlier. Raft server proxy should close // the rpc server on failure. testFailureCase("start a new server with the same address", - () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), stateMachine, properties, null), - IOException.class, IOException.class, OverlappingFileLockException.class); + () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), stateMachine, properties, null).start(), + IOException.class, OverlappingFileLockException.class); // Try to start a raft server rpc at the leader address. cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId)); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java index 5e571d4..5d07752 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java @@ -149,7 +149,7 @@ public class HadoopRpcService extends RaftServerRpcWithProxy<Proxy<RaftServerPro } @Override - public void closeImpl() { + public void closeImpl() throws IOException { ipcServer.stop(); super.closeImpl(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index ebe3184..45c0b77 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -134,7 +134,7 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy, } @Override - public void closeImpl() { + public void closeImpl() throws IOException { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); final ChannelFuture f = getChannel().close(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index acd54cb..e071d4b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -26,6 +26,7 @@ import org.apache.ratis.server.impl.ServerImplUtils; import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol; import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.LifeCycle; import java.io.Closeable; import java.io.IOException; @@ -50,7 +51,9 @@ public interface RaftServer extends Closeable, RpcType.Get, ServerFactory getFactory(); /** Start this server. */ - void start(); + void start() throws IOException; + + LifeCycle.State getLifeCycleState(); /** @return a {@link Builder}. */ static Builder newBuilder() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java index 33db36d..a85e606 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java @@ -23,6 +23,7 @@ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.server.protocol.RaftServerProtocol; import java.io.Closeable; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.Objects; @@ -51,7 +52,7 @@ public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, Closeabl } /** Start the RPC service. */ - void start(); + void start() throws IOException; /** @return the address where this RPC server is listening to. */ InetSocketAddress getInetSocketAddress(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index fd0ee4a..e33177a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -166,8 +166,11 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou this.role.transitionRole(newRole); } - void start() { - lifeCycle.transition(STARTING); + boolean start() { + if (!lifeCycle.compareAndTransition(NEW, STARTING)) { + return false; + } + LOG.info("{}: start {}", getId(), groupId); state.start(); RaftConfiguration conf = getRaftConf(); if (conf != null && conf.contains(getId())) { @@ -179,6 +182,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } registerMBean(getId(), getGroupId(), jmxAdapter, jmxAdapter); + return true; } static boolean registerMBean( @@ -235,6 +239,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou void shutdown() { lifeCycle.checkStateAndClose(() -> { + LOG.info("{}: shutdown {}", getId(), groupId); try { jmxAdapter.unregister(); } catch (Exception ignored) { @@ -490,8 +495,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou peers.toArray(new RaftPeer[peers.size()])); } - private void assertLifeCycleState(LifeCycle.State... expected) throws ServerNotReadyException { - lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException("Server " + n + private LifeCycle.State assertLifeCycleState(LifeCycle.State... expected) throws ServerNotReadyException { + return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException("Server " + n + " is not " + Arrays.toString(expected) + ": current state is " + c), expected); } @@ -866,7 +871,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou + ", commits" + ProtoUtils.toString(commitInfos) + ", entries: " + ServerProtoUtils.toString(entries)); - assertLifeCycleState(STARTING, RUNNING); + final LifeCycle.State currentState = assertLifeCycleState(STARTING, RUNNING); + if (currentState == STARTING) { + if (role.getCurrentRole() == null) { + throw new ServerNotReadyException("The role of Server " + getId() + " is not yet initialized."); + } + } assertGroup(leaderId, leaderGroupId); try { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 9db7735..27ec67f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -26,35 +26,132 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.CheckedFunction; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.ProtoUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Collections; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; public class RaftServerProxy implements RaftServer { public static final Logger LOG = LoggerFactory.getLogger(RaftServerProxy.class); + /** + * A map: {@link RaftGroupId} -> {@link RaftServerImpl} futures. + * + * The map is synchronized for mutations and the bulk {@link #getAll()} method + * but the (non-bulk) {@link #get(RaftGroupId)} and {@link #containsGroup(RaftGroupId)} methods are not. + * The thread safety and atomicity guarantees for the non-bulk methods are provided by {@link ConcurrentMap}. + */ + class ImplMap implements Closeable { + private final ConcurrentMap<RaftGroupId, CompletableFuture<RaftServerImpl>> map = new ConcurrentHashMap<>(); + private boolean isClosed = false; + + synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group) { + if (isClosed) { + return JavaUtils.completeExceptionally(new AlreadyClosedException( + getId() + ": Failed to add " + group + " since the server is already closed")); + } + if (containsGroup(group.getGroupId())) { + return JavaUtils.completeExceptionally(new AlreadyExistsException( + getId() + ": Failed to add " + group + " since the group already exists in the map.")); + } + final RaftGroupId groupId = group.getGroupId(); + final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group); + final CompletableFuture<RaftServerImpl> previous = map.put(groupId, newImpl); + Preconditions.assertNull(previous, "previous"); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl)); + } + return newImpl; + } + + synchronized CompletableFuture<RaftServerImpl> remove(RaftGroupId groupId) { + final CompletableFuture<RaftServerImpl> future = map.remove(groupId); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: remove {}", getId(), toString(groupId, future)); + } + return future; + } + + @Override + public synchronized void close() { + if (isClosed) { + LOG.info("{} is already closed.", getId()); + return; + } + isClosed = true; + map.values().parallelStream().map(CompletableFuture::join).forEach(RaftServerImpl::shutdown); + } + + synchronized List<CompletableFuture<RaftServerImpl>> getAll() { + return new ArrayList<>(map.values()); + } + + CompletableFuture<RaftServerImpl> get(RaftGroupId groupId) { + final CompletableFuture<RaftServerImpl> i = map.get(groupId); + if (i == null) { + return JavaUtils.completeExceptionally(new GroupMismatchException( + getId() + ": " + groupId + " not found.")); + } + return i; + } + + boolean containsGroup(RaftGroupId groupId) { + return map.containsKey(groupId); + } + + @Override + public synchronized String toString() { + if (map.isEmpty()) { + return "<EMPTY>"; + } else if (map.size() == 1) { + return toString(map.entrySet().iterator().next()); + } + final StringBuilder b = new StringBuilder("["); + map.entrySet().forEach(e -> b.append("\n ").append(toString(e))); + return b.append("] size=").append(map.size()).toString(); + } + + String toString(Map.Entry<RaftGroupId, CompletableFuture<RaftServerImpl>> e) { + return toString(e.getKey(), e.getValue()); + } + + String toString(RaftGroupId groupId, CompletableFuture<RaftServerImpl> f) { + return "" + (f != null && f.isDone()? f.join(): groupId + ":" + f); + } + } + private final RaftPeerId id; private final RaftProperties properties; private final StateMachine.Registry stateMachineRegistry; + private final LifeCycle lifeCycle; private final RaftServerRpc serverRpc; private final ServerFactory factory; - private volatile CompletableFuture<RaftServerImpl> impl; + private final ImplMap impls = new ImplMap(); private final AtomicReference<ReinitializeRequest> reinitializeRequest = new AtomicReference<>(); RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry, - RaftGroup group, RaftProperties properties, Parameters parameters) - throws IOException { + RaftProperties properties, Parameters parameters) { this.properties = properties; this.stateMachineRegistry = stateMachineRegistry; @@ -63,23 +160,18 @@ public class RaftServerProxy implements RaftServer { this.serverRpc = factory.newRaftServerRpc(this); this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc)); + this.lifeCycle = new LifeCycle(this.id); + } - try { - this.impl = CompletableFuture.completedFuture(initImpl(group)); - } catch (IOException ioe) { + private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) { + return CompletableFuture.supplyAsync(() -> { try { - serverRpc.close(); - } catch (IOException closeIoe) { - LOG.warn(this.id + ": Failed to close server rpc.", closeIoe); - ioe.addSuppressed(closeIoe); - } finally { - throw ioe; + serverRpc.addPeers(group.getPeers()); + return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this); + } catch(IOException e) { + throw new CompletionException(getId() + ": Failed to initialize server for " + group, e); } - } - } - - private RaftServerImpl initImpl(RaftGroup group) throws IOException { - return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this); + }); } private static String getIdStringFrom(RaftServerRpc rpc) { @@ -98,8 +190,9 @@ public class RaftServerProxy implements RaftServer { return id; } + @Override public Iterable<RaftGroupId> getGroupIds() throws IOException { - return Collections.singleton(getImpl().getGroupId()); + return getImpls().stream().map(RaftServerImpl::getGroupId).collect(Collectors.toList()); } @Override @@ -121,54 +214,85 @@ public class RaftServerProxy implements RaftServer { return serverRpc; } - public RaftServerImpl getImpl() throws IOException { - final CompletableFuture<RaftServerImpl> i = impl; - if (i == null) { - throw new ServerNotReadyException(getId() + " is not initialized."); - } - try { - return i.get(); - } catch (InterruptedException e) { - throw IOUtils.toInterruptedIOException(getId() + ": getImpl interrupted.", e); - } catch (ExecutionException e) { - throw IOUtils.asIOException(e); + public boolean containsGroup(RaftGroupId groupId) { + return impls.containsGroup(groupId); + } + + CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) { + return impls.addNew(group); + } + + private CompletableFuture<RaftServerImpl> getImplFuture(RaftGroupId groupId) { + return impls.get(groupId); + } + + private RaftServerImpl getImpl(RaftRpcRequestProto proto) throws IOException { + return getImpl(ProtoUtils.toRaftGroupId(proto.getRaftGroupId())); + } + + RaftServerImpl getImpl(RaftGroupId groupId) throws IOException { + Objects.requireNonNull(groupId, "groupId == null"); + return IOUtils.getFromFuture(getImplFuture(groupId), getId()); + } + + List<RaftServerImpl> getImpls() throws IOException { + final List<RaftServerImpl> list = new ArrayList<>(); + for(CompletableFuture<RaftServerImpl> f : impls.getAll()) { + list.add(IOUtils.getFromFuture(f, getId())); } + return list; } @Override - public void start() { - LOG.info("{}: start", getId()); - JavaUtils.getAndConsume(impl, RaftServerImpl::start); - getServerRpc().start(); + public LifeCycle.State getLifeCycleState() { + return lifeCycle.getCurrentState(); + } + + @Override + public void start() throws IOException { + getImpls().parallelStream().forEach(RaftServerImpl::start); + + lifeCycle.startAndTransition(() -> { + LOG.info("{}: start RPC server", getId()); + getServerRpc().start(); + }, IOException.class); } @Override public void close() { - LOG.info("{}: close", getId()); - JavaUtils.getAndConsume(impl, RaftServerImpl::shutdown); - try { - getServerRpc().close(); - } catch (IOException ignored) { - LOG.warn("Failed to close RPC server for " + getId(), ignored); - } + lifeCycle.checkStateAndClose(() -> { + LOG.info("{}: close", getId()); + impls.close(); + + try { + getServerRpc().close(); + } catch(IOException ignored) { + LOG.warn(getId() + ": Failed to close " + getRpcType() + " server", ignored); + } + }); + } + + private <REPLY> CompletableFuture<REPLY> submitRequest(RaftGroupId groupId, + CheckedFunction<RaftServerImpl, CompletableFuture<REPLY>, IOException> submitFunction) { + return getImplFuture(groupId).thenCompose( + impl -> JavaUtils.callAsUnchecked(() -> submitFunction.apply(impl), CompletionException::new)); } @Override - public CompletableFuture<RaftClientReply> submitClientRequestAsync( - RaftClientRequest request) throws IOException { - return getImpl().submitClientRequestAsync(request); + public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) { + return submitRequest(request.getRaftGroupId(), impl -> impl.submitClientRequestAsync(request)); } @Override public RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException { - return getImpl().submitClientRequest(request); + return getImpl(request.getRaftGroupId()).submitClientRequest(request); } @Override public RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException { - return getImpl().setConfiguration(request); + return getImpl(request.getRaftGroupId()).setConfiguration(request); } @Override @@ -180,37 +304,29 @@ public class RaftServerProxy implements RaftServer { @Override public CompletableFuture<RaftClientReply> reinitializeAsync( ReinitializeRequest request) throws IOException { - LOG.info("{}: reinitializeAsync {}", getId(), request); - getImpl().assertGroup(request.getRequestorId(), request.getRaftGroupId()); + LOG.info("{}: reinitialize* {}", getId(), request); if (!reinitializeRequest.compareAndSet(null, request)) { throw new IOException("Another reinitialize is already in progress."); } - - return CompletableFuture.supplyAsync(() -> { - try { - final CompletableFuture<RaftServerImpl> oldImpl = impl; - impl = new CompletableFuture<>(); - JavaUtils.getAndConsume(oldImpl, RaftServerImpl::shutdown); - - final RaftServerImpl newImpl; - try { - newImpl = initImpl(request.getGroup()); - } catch (IOException ioe) { - final RaftException re = new RaftException( - "Failed to reinitialize, request=" + request, ioe); - impl.completeExceptionally(new IOException( - "Server " + getId() + " is not initialized.", re)); - return new RaftClientReply(request, re, null); - } - - getServerRpc().addPeers(request.getGroup().getPeers()); - newImpl.start(); - impl.complete(newImpl); - return new RaftClientReply(request, newImpl.getCommitInfos()); - } finally { - reinitializeRequest.set(null); - } - }); + final RaftGroupId oldGroupId = request.getRaftGroupId(); + return getImplFuture(oldGroupId) + .thenAcceptAsync(RaftServerImpl::shutdown) + .thenAccept(_1 -> impls.remove(oldGroupId)) + .thenCompose(_1 -> impls.addNew(request.getGroup())) + .thenApply(newImpl -> { + LOG.debug("{}: newImpl = {}", getId(), newImpl); + final boolean started = newImpl.start(); + Preconditions.assertTrue(started, () -> getId()+ ": failed to start a new impl: " + newImpl); + return new RaftClientReply(request, newImpl.getCommitInfos()); + }) + .whenComplete((_1, throwable) -> { + if (throwable != null) { + impls.remove(request.getGroup().getGroupId()); + LOG.warn(getId() + ": Failed reinitialize* " + request, throwable); + } + + reinitializeRequest.set(null); + }); } @Override @@ -223,48 +339,41 @@ public class RaftServerProxy implements RaftServer { @Override public CompletableFuture<ServerInformationReply> getInfoAsync( ServerInformationRequest request) { - return impl.thenApply(server -> server.getServerInformation(request)); + return getImplFuture(request.getRaftGroupId()).thenApplyAsync( + server -> server.getServerInformation(request)); } /** * Handle a raft configuration change request from client. */ @Override - public CompletableFuture<RaftClientReply> setConfigurationAsync( - SetConfigurationRequest request) throws IOException { - return getImpl().setConfigurationAsync(request); + public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) { + return submitRequest(request.getRaftGroupId(), impl -> impl.setConfigurationAsync(request)); } @Override - public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) - throws IOException { - return getImpl().requestVote(r); + public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException { + return getImpl(request.getServerRequest()).requestVote(request); } @Override - public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync( - AppendEntriesRequestProto r) throws IOException { - return getImpl().appendEntriesAsync(r); + public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) { + final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId()); + return submitRequest(groupId, impl -> impl.appendEntriesAsync(request)); } @Override - public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) - throws IOException { - return getImpl().appendEntries(r); + public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException { + return getImpl(request.getServerRequest()).appendEntries(request); } @Override - public InstallSnapshotReplyProto installSnapshot( - InstallSnapshotRequestProto request) throws IOException { - return getImpl().installSnapshot(request); + public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException { + return getImpl(request.getServerRequest()).installSnapshot(request); } @Override public String toString() { - try { - return getImpl().toString(); - } catch (IOException ignored) { - return getClass().getSimpleName() + ":" + getId(); - } + return getId() + String.format(":%9s ", lifeCycle.getCurrentState()) + impls; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java index aa8e2f7..68cf6fc 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java @@ -25,6 +25,7 @@ import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.PeerProxyMap; import java.io.Closeable; +import java.io.IOException; import java.util.function.Function; import java.util.function.Supplier; @@ -64,18 +65,18 @@ public abstract class RaftServerRpcWithProxy<PROXY extends Closeable, PROXIES ex } @Override - public final void start() { - getLifeCycle().startAndTransition(() -> startImpl()); + public final void start() throws IOException { + getLifeCycle().startAndTransition(this::startImpl, IOException.class); } - public abstract void startImpl(); + public abstract void startImpl() throws IOException; @Override - public final void close() { + public final void close() throws IOException{ getLifeCycle().checkStateAndClose(() -> closeImpl()); } - public void closeImpl() { + public void closeImpl() throws IOException { getProxies().close(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java index e7a6e80..20ec951 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java @@ -59,4 +59,9 @@ public class RoleInfo { public boolean isLeader() { return role == RaftPeerRole.LEADER; } + + @Override + public String toString() { + return "" + role; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index d9e0ee9..7b80dcf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -34,27 +34,26 @@ public class ServerImplUtils { public static RaftServerProxy newRaftServer( RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftProperties properties, Parameters parameters) throws IOException { - return newRaftServer(id, group, gid -> stateMachine, properties, parameters); + RaftServerProxy.LOG.debug("newRaftServer: {}, {}", id, group); + final RaftServerProxy proxy = newRaftServer(id, gid -> stateMachine, properties, parameters); + if (group != null) { + proxy.addGroup(group); + } + return proxy; } - public static RaftServerProxy newRaftServer( - RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry, - RaftProperties properties, Parameters parameters) throws IOException { + private static RaftServerProxy newRaftServer( + RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters) + throws IOException { final RaftServerProxy proxy; try { // attempt multiple times to avoid temporary bind exception proxy = JavaUtils.attempt( - () -> new RaftServerProxy(id, stateMachineRegistry, group, properties, parameters), + () -> new RaftServerProxy(id, stateMachineRegistry, properties, parameters), 5, 500L, "new RaftServerProxy", RaftServerProxy.LOG); } catch (InterruptedException e) { throw IOUtils.toInterruptedIOException( - "Interrupted when creating RaftServer " + id + ", " + group, e); - } catch (IOException e) { - throw new IOException("Failed to create RaftServer " + id + ", " + group, e); - } - // add peers into rpc service - if (!group.getPeers().isEmpty()) { - proxy.getServerRpc().addPeers(group.getPeers()); + "Interrupted when creating RaftServer " + id, e); } return proxy; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 0d2ec4c..296dce6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -77,7 +77,7 @@ public abstract class RaftLog implements Closeable { public void checkLogState() { Preconditions.assertTrue(isOpen, - "The RaftLog has not been opened or has been closed"); + () -> getSelfId() + ": The RaftLog has not been opened or has been closed"); } /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 3806bb8..8104938 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -26,6 +26,7 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.storage.MemoryRaftLog; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; @@ -147,6 +148,7 @@ public abstract class MiniRaftCluster { protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) { this.group = initRaftGroup(Arrays.asList(ids)); + LOG.info("new MiniRaftCluster {}", group); this.properties = new RaftProperties(properties); this.parameters = parameters; @@ -160,6 +162,7 @@ public abstract class MiniRaftCluster { } public MiniRaftCluster initServers() { + LOG.info("servers = " + servers); if (servers.isEmpty()) { putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true); } @@ -183,7 +186,7 @@ public abstract class MiniRaftCluster { .collect(Collectors.toList()); } - public void start() { + public void start() throws IOException { LOG.info(".............................................................. "); LOG.info("... "); LOG.info("... Starting " + getClass().getSimpleName()); @@ -191,7 +194,7 @@ public abstract class MiniRaftCluster { LOG.info(".............................................................. "); initServers(); - servers.values().forEach(RaftServer::start); + startServers(servers.values()); } /** @@ -201,7 +204,7 @@ public abstract class MiniRaftCluster { killServer(newId); servers.remove(newId); - startServer(putNewServer(newId, format), true); + putNewServer(newId, format).start(); } public void restart(boolean format) throws IOException { @@ -217,8 +220,8 @@ public abstract class MiniRaftCluster { return RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS); } - private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group, - boolean format) { + private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group, boolean format) { + LOG.info("newRaftServer: {}, {}, format? {}", id, group, format); try { final File dir = getStorageDir(id); if (format) { @@ -280,14 +283,14 @@ public abstract class MiniRaftCluster { return addNewPeers(generateIds(number, servers.size()), startNewPeer); } - public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) { + public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) throws IOException { LOG.info("Add new peers {}", Arrays.asList(ids)); // create and add new RaftServers final Collection<RaftServerProxy> newServers = putNewServers( CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true); - newServers.forEach(s -> startServer(s, true)); + startServers(newServers); if (!startNewPeer) { // start and then close, in order to bind the port newServers.forEach(p -> p.close()); @@ -301,16 +304,12 @@ public abstract class MiniRaftCluster { return new PeerChanges(p, np, new RaftPeer[0]); } - protected void startServer(RaftServer server, boolean startService) { - if (startService) { - server.start(); + static void startServers(Iterable<? extends RaftServer> servers) throws IOException { + for(RaftServer s : servers) { + s.start(); } } - public void startServer(RaftPeerId id) { - startServer(getServer(id), true); - } - /** * prepare the peer list when removing some peers from the conf */ @@ -356,18 +355,7 @@ public abstract class MiniRaftCluster { } else { b.append("ALL groups"); } - getServers().stream().filter( - s -> { - if (groupId == null) { - return true; - } - try { - return groupId.equals(s.getImpl().getGroupId()); - } catch (IOException e) { - return false; - } - }) - .forEach(s -> b.append("\n ").append(s)); + getRaftServerProxyStream(groupId).forEach(s -> b.append("\n ").append(s)); return b.toString(); } @@ -407,11 +395,7 @@ public abstract class MiniRaftCluster { } public RaftServerImpl getLeader(RaftGroupId groupId) { - Stream<RaftServerImpl> stream = getServerAliveStream(); - if (groupId != null) { - stream = stream.filter(s -> groupId.equals(s.getGroupId())); - } - return getLeader(stream); + return getLeader(getServerAliveStream(groupId)); } static RaftServerImpl getLeader(Stream<RaftServerImpl> serverAliveStream) { @@ -439,7 +423,7 @@ public abstract class MiniRaftCluster { return leaders.get(0); } - boolean isLeader(String leaderId) throws InterruptedException { + boolean isLeader(String leaderId) { final RaftServerImpl leader = getLeader(); return leader != null && leader.getId().toString().equals(leaderId); } @@ -454,24 +438,41 @@ public abstract class MiniRaftCluster { return servers.values(); } - public Iterable<RaftServerImpl> iterateServerImpls() { - return CollectionUtils.as(getServers(), RaftTestUtil::getImplAsUnchecked); + private Stream<RaftServerProxy> getRaftServerProxyStream(RaftGroupId groupId) { + return getServers().stream() + .filter(s -> groupId == null || s.containsGroup(groupId)); } - public static Stream<RaftServerImpl> getServerStream(Collection<RaftServerProxy> servers) { - return servers.stream().map(RaftTestUtil::getImplAsUnchecked); + public Iterable<RaftServerImpl> iterateServerImpls() { + return CollectionUtils.as(getServers(), this::getRaftServerImpl); } - public Stream<RaftServerImpl> getServerStream() { - return getServerStream(getServers()); + + private Stream<RaftServerImpl> getServerStream(RaftGroupId groupId) { + final Stream<RaftServerProxy> stream = getRaftServerProxyStream(groupId); + return groupId != null? stream.map(s -> RaftServerTestUtil.getRaftServerImpl(s, groupId)) + : stream.flatMap(s -> RaftServerTestUtil.getRaftServerImpls(s).stream()); } + public Stream<RaftServerImpl> getServerAliveStream() { - return getServerStream(getServers()).filter(RaftServerImpl::isAlive); + return getServerAliveStream(getGroupId()); + } + + private Stream<RaftServerImpl> getServerAliveStream(RaftGroupId groupId) { + return getServerStream(groupId).filter(RaftServerImpl::isAlive); } public RaftServerProxy getServer(RaftPeerId id) { return servers.get(id); } + public RaftServerImpl getRaftServerImpl(RaftPeerId id) { + return getRaftServerImpl(servers.get(id)); + } + + private RaftServerImpl getRaftServerImpl(RaftServerProxy proxy) { + return RaftServerTestUtil.getRaftServerImpl(proxy, getGroupId()); + } + public List<RaftPeer> getPeers() { return toRaftPeers(getServers()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index b297a27..3d80f5e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -26,6 +26,7 @@ import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto; @@ -90,6 +91,13 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba } } + static void setBlockTransaction(boolean block, MiniRaftCluster cluster) throws InterruptedException { + for (RaftServerProxy server : cluster.getServers()) { + final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId()); + ((SimpleStateMachine4Testing)impl.getStateMachine()).setBlockTransaction(block); + } + } + @Test public void testAsyncRequestSemaphore() throws Exception { LOG.info("Running testAsyncRequestSemaphore"); @@ -103,9 +111,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages); final RaftClient client = cluster.createClient(); //Set blockTransaction flag so that transaction blocks - for (RaftServerProxy server : cluster.getServers()) { - ((SimpleStateMachine4Testing) server.getImpl().getStateMachine()).setBlockTransaction(true); - } + setBlockTransaction(true, cluster); //Send numMessages which are blocked and do not release the client semaphore permits AtomicInteger blockedRequestsCount = new AtomicInteger(); @@ -133,9 +139,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1); //Unset the blockTransaction flag so that semaphore permits can be released - for (RaftServerProxy server : cluster.getServers()) { - ((SimpleStateMachine4Testing) server.getImpl().getStateMachine()).setBlockTransaction(false); - } + setBlockTransaction(false, cluster); for(int i=0; i<=numMessages; i++){ futures[i].join(); } @@ -148,7 +152,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba try { cluster.start(); waitForLeader(cluster); - RaftBasicTests.runTestBasicAppendEntries(true, replication, killLeader, 1000, cluster, LOG); + RaftBasicTests.runTestBasicAppendEntries(true, replication, killLeader, 100, cluster, LOG); } finally { cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 8c744ab..8c0def9 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -176,7 +176,7 @@ public abstract class RaftBasicTests extends BaseTest { LOG.info(cluster.printAllLogs()); for(RaftServerProxy server : cluster.getServers()) { - final RaftServerImpl impl = server.getImpl(); + final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId()); if (impl.isAlive() || replication == ReplicationLevel.ALL) { JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, messages), 5, 1000, impl.getId() + " assertLogEntries", LOG); @@ -420,7 +420,7 @@ public abstract class RaftBasicTests extends BaseTest { if (c.exceptionInClientThread.get() != null) { throw new AssertionError(c.exceptionInClientThread.get()); } - RaftTestUtil.assertLogEntries(cluster.getServers(), c.messages); + RaftTestUtil.assertLogEntries(cluster, c.messages); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 53a051a..8667f50 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -52,13 +52,8 @@ import java.util.function.Predicate; public interface RaftTestUtil { - Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class); - static RaftServerImpl getImplAsUnchecked(RaftServerProxy proxy) { - return JavaUtils.callAsUnchecked(proxy::getImpl); - } - static RaftServerImpl waitForLeader(MiniRaftCluster cluster) throws InterruptedException { return waitForLeader(cluster, false); @@ -156,11 +151,9 @@ public interface RaftTestUtil { } } - static void assertLogEntries(Collection<RaftServerProxy> servers, - SimpleMessage... expectedMessages) { - final int size = servers.size(); - final long count = MiniRaftCluster.getServerStream(servers) - .filter(RaftServerImpl::isAlive) + static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage... expectedMessages) { + final int size = cluster.getServers().size(); + final long count = cluster.getServerAliveStream() .map(s -> s.getState().getLog()) .filter(log -> logEntriesContains(log, expectedMessages)) .count(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java index 67156a1..04a2a8d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java +++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java @@ -33,6 +33,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.IOException; import java.util.concurrent.TimeUnit; /** @@ -60,7 +61,7 @@ public class TestRaftServerLeaderElectionTimeout extends BaseTest { } @Before - public void setup() { + public void setup() throws IOException { Assert.assertNull(cluster.getLeader()); cluster.start(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java index 811e7de..51af8af 100644 --- a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java +++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java @@ -64,7 +64,7 @@ public class TestRaftServerSlownessDetection extends BaseTest { } @Before - public void setup() { + public void setup() throws IOException { Assert.assertNull(cluster.getLeader()); cluster.start(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 4886bcc..54fbf5f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -101,7 +101,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> } final RaftServerImpl leader = waitForLeader(cluster); - final TimeDuration sleepTime = TimeDuration.valueOf(5, TimeUnit.SECONDS); + final TimeDuration sleepTime = TimeDuration.valueOf(3, TimeUnit.SECONDS); LOG.info("sleep " + sleepTime); sleepTime.sleep(); @@ -109,8 +109,9 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> final RaftServerProxy lastServer = i.next(); lastServer.start(); final RaftPeerId lastServerLeaderId = JavaUtils.attempt( - () -> getLeader(lastServer.getImpl().getState()), + () -> getLeader(lastServer.getImpls().iterator().next().getState()), 10, 1000, "getLeaderId", LOG); + LOG.info(cluster.printServers()); Assert.assertEquals(leader.getId(), lastServerLeaderId); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 8a340d7..79017d4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -321,8 +321,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { final RaftLog leaderLog = cluster.getLeader().getState().getLog(); for (RaftPeer newPeer : c1.newPeers) { Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE), - cluster.getServer(newPeer.getId()) - .getImpl().getState().getLog() + cluster.getRaftServerImpl(newPeer.getId()).getState().getLog() .getEntries(0, Long.MAX_VALUE)); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index a72e6f5..a4ec715 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -19,6 +19,7 @@ package org.apache.ratis.server.impl; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.util.JavaUtils; import org.junit.Assert; @@ -26,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.List; import java.util.stream.Stream; public class RaftServerTestUtil { @@ -87,4 +89,12 @@ public class RaftServerTestUtil { public static Logger getStateMachineUpdaterLog() { return StateMachineUpdater.LOG; } + + public static List<RaftServerImpl> getRaftServerImpls(RaftServerProxy proxy) { + return JavaUtils.callAsUnchecked(proxy::getImpls); + } + + public static RaftServerImpl getRaftServerImpl(RaftServerProxy proxy, RaftGroupId groupId) { + return JavaUtils.callAsUnchecked(() -> proxy.getImpl(groupId)); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java index 251a4d3..f6e417c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java @@ -48,6 +48,7 @@ public abstract class ReinitializationBaseTest extends BaseTest { static final Logger LOG = LoggerFactory.getLogger(ReinitializationBaseTest.class); { + LogUtils.setLogLevel(RaftServerProxy.LOG, Level.DEBUG); LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } @@ -66,7 +67,7 @@ public abstract class ReinitializationBaseTest extends BaseTest { LOG.info("Start testReinitialize" + cluster.printServers()); // Start server with an empty conf - final RaftGroupId groupId = RaftGroupId.randomId(); + final RaftGroupId groupId = cluster.getGroupId(); final RaftGroup group = new RaftGroup(groupId); final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0)) @@ -75,10 +76,10 @@ public abstract class ReinitializationBaseTest extends BaseTest { LOG.info("putNewServer: " + cluster.printServers()); cluster.start(); - LOG.info("start: " + cluster.printServers()); // Make sure that there are no leaders. TimeUnit.SECONDS.sleep(1); + LOG.info("start: " + cluster.printServers()); Assert.assertNull(cluster.getLeader()); // Reinitialize servers @@ -128,18 +129,20 @@ public abstract class ReinitializationBaseTest extends BaseTest { LOG.info("\n\nrunTestReinitializeMultiGroups with " + type + ": " + cluster.printServers()); // Start server with an empty conf - final RaftGroup emptyGroup = new RaftGroup(RaftGroupId.randomId()); + final RaftGroup emptyGroup = new RaftGroup(cluster.getGroupId()); final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(idIndex[idIndex.length - 1], 0)) .map(RaftPeerId::valueOf).collect(Collectors.toList()); + LOG.info("ids: " + ids); ids.forEach(id -> cluster.putNewServer(id, emptyGroup, true)); LOG.info("putNewServer: " + cluster.printServers()); + TimeUnit.SECONDS.sleep(1); cluster.start(); - LOG.info("start: " + cluster.printServers()); // Make sure that there are no leaders. TimeUnit.SECONDS.sleep(1); + LOG.info("start: " + cluster.printServers()); Assert.assertNull(cluster.getLeader()); // Reinitialize servers to three groups http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java index 1be6c3a..c4a9a5e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java @@ -17,7 +17,6 @@ */ package org.apache.ratis.server.simulation; -import org.apache.ratis.RaftTestUtil; import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; @@ -25,6 +24,8 @@ import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.util.Daemon; import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.LifeCycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,7 +126,7 @@ class SimulatedServerRpc implements RaftServerRpc { = new RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply>() { @Override public boolean isAlive() { - return RaftTestUtil.getImplAsUnchecked(server).isAlive(); + return !server.getLifeCycleState().isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED); } @Override @@ -147,7 +148,7 @@ class SimulatedServerRpc implements RaftServerRpc { = new RequestHandler.HandlerInterface<RaftClientRequest, RaftClientReply>() { @Override public boolean isAlive() { - return RaftTestUtil.getImplAsUnchecked(server).isAlive(); + return !server.getLifeCycleState().isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED); } @Override @@ -168,7 +169,8 @@ class SimulatedServerRpc implements RaftServerRpc { future.whenCompleteAsync((reply, exception) -> { try { - final IOException e = IOUtils.asIOException(exception); + final IOException e = exception == null? null + : IOUtils.asIOException(JavaUtils.unwrapCompletionException(exception)); clientHandler.getRpc().sendReply(request, reply, e); } catch (IOException e) { LOG.warn("Failed to send reply {} for request {} due to exception {}", http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java index ef018e5..cf0f611 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -43,6 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.util.List; public abstract class RaftSnapshotBaseTest extends BaseTest { @@ -82,7 +83,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { public abstract MiniRaftCluster.Factory<?> getFactory(); @Before - public void setup() { + public void setup() throws IOException { final RaftProperties prop = new RaftProperties(); prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java index a045ecd..07039b1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java @@ -27,7 +27,6 @@ import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.statemachine.impl.TransactionContextImpl; @@ -68,31 +67,19 @@ public class TestStateMachine extends BaseTest { @Before public void setup() throws IOException { - } + properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SMTransactionContext.class, StateMachine.class); - private void startCluster() { - cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster( - NUM_SERVERS, properties); - Assert.assertNull(getCluster().getLeader()); - getCluster().start(); + cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(NUM_SERVERS, properties); + cluster.start(); } @After public void tearDown() { - final MiniRaftCluster cluster = getCluster(); if (cluster != null) { cluster.shutdown(); } } - public MiniRaftClusterWithSimulatedRpc getCluster() { - return cluster; - } - - public RaftProperties getProperties() { - return properties; - } - static class SMTransactionContext extends SimpleStateMachine4Testing { public static SMTransactionContext get(RaftServerImpl s) { return (SMTransactionContext)s.getStateMachine(); @@ -149,11 +136,6 @@ public class TestStateMachine extends BaseTest { @Test public void testTransactionContextIsPassedBack() throws Throwable { // tests that the TrxContext set by the StateMachine in Leader is passed back to the SM - properties.setClass( - MiniRaftCluster.STATEMACHINE_CLASS_KEY, - SMTransactionContext.class, StateMachine.class); - startCluster(); - int numTrx = 100; final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numTrx); try(final RaftClient client = cluster.createClient()) { @@ -165,8 +147,8 @@ public class TestStateMachine extends BaseTest { // TODO: there eshould be a better way to ensure all data is replicated and applied Thread.sleep(cluster.getMaxTimeout() + 100); - for (RaftServerProxy raftServer : cluster.getServers()) { - final SMTransactionContext sm = SMTransactionContext.get(raftServer.getImpl()); + for (RaftServerImpl raftServer : cluster.iterateServerImpls()) { + final SMTransactionContext sm = SMTransactionContext.get(raftServer); sm.rethrowIfException(); assertEquals(numTrx, sm.numApplied.get()); }
