Repository: incubator-ratis Updated Branches: refs/heads/master 2fbbe0aa9 -> 341456e04
RATIS-87. Separate RaftServerImpl into proxy and impl. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/341456e0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/341456e0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/341456e0 Branch: refs/heads/master Commit: 341456e04493fa014deac8cae803d78cb7f247a8 Parents: 2fbbe0a Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Wed May 10 13:38:51 2017 -0700 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Wed May 10 13:38:51 2017 -0700 ---------------------------------------------------------------------- .../org/apache/ratis/TestRestartRaftPeer.java | 6 +- .../TestRaftStateMachineException.java | 2 +- .../java/org/apache/ratis/grpc/GrpcFactory.java | 3 +- .../ratis/grpc/server/GRpcLogAppender.java | 3 +- .../ratis/grpc/MiniRaftClusterWithGRpc.java | 5 +- .../apache/ratis/hadooprpc/HadoopFactory.java | 3 +- .../hadooprpc/MiniRaftClusterWithHadoopRpc.java | 4 +- .../org/apache/ratis/netty/NettyFactory.java | 3 +- .../ratis/netty/MiniRaftClusterWithNetty.java | 7 +- .../ratis/server/impl/LeaderElection.java | 2 +- .../apache/ratis/server/impl/LeaderState.java | 9 +- .../apache/ratis/server/impl/LogAppender.java | 2 +- .../ratis/server/impl/RaftServerImpl.java | 204 ++++++++----------- .../ratis/server/impl/RaftServerProxy.java | 167 +++++++++++++++ .../apache/ratis/server/impl/ServerFactory.java | 3 +- .../ratis/server/impl/ServerImplUtils.java | 4 +- .../java/org/apache/ratis/MiniRaftCluster.java | 118 +++++------ .../java/org/apache/ratis/RaftBasicTests.java | 2 +- .../org/apache/ratis/RaftRetryCacheTests.java | 2 +- .../java/org/apache/ratis/RaftTestUtil.java | 12 +- .../impl/RaftReconfigurationBaseTest.java | 10 +- .../ratis/server/impl/RaftServerTestUtil.java | 7 +- .../MiniRaftClusterWithSimulatedRpc.java | 3 +- .../ratis/server/simulation/SimulatedRpc.java | 3 +- .../server/simulation/SimulatedServerRpc.java | 7 +- .../statemachine/RaftSnapshotBaseTest.java | 4 +- .../SimpleStateMachine4Testing.java | 4 +- .../ratis/statemachine/TestStateMachine.java | 8 +- 28 files changed, 375 insertions(+), 232 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java index dc38e82..b5a92c2 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java @@ -88,7 +88,7 @@ public class TestRestartRaftPeer { } // restart a follower - String followerId = cluster.getFollowers().get(0).getId().toString(); + RaftPeerId followerId = cluster.getFollowers().get(0).getId(); LOG.info("Restart follower {}", followerId); cluster.restartServer(followerId, false); @@ -103,14 +103,14 @@ public class TestRestartRaftPeer { long lastAppliedIndex = 0; for (int i = 0; i < 10 && !catchup; i++) { Thread.sleep(500); - lastAppliedIndex = cluster.getServer(followerId).getState().getLastAppliedIndex(); + lastAppliedIndex = cluster.getServer(followerId).getImpl().getState().getLastAppliedIndex(); catchup = lastAppliedIndex >= 20; } Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup); // make sure the restarted peer's log segments is correct cluster.restartServer(followerId, false); - Assert.assertTrue(cluster.getServer(followerId).getState().getLog() + Assert.assertTrue(cluster.getServer(followerId).getImpl().getState().getLog() .getLastEntryTermIndex().getIndex() >= 20); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java index 3dbd248..f049cda 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java +++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java @@ -138,7 +138,7 @@ public class TestRaftStateMachineException { long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex(); // make sure retry cache has the entry - for (RaftServerImpl server : cluster.getServers()) { + for (RaftServerImpl server : cluster.iterateServerImpls()) { LOG.info("check server " + server.getId()); if (server.getState().getLastAppliedIndex() < leaderApplied) { Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java index 60d6ef6..16f5c1b 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java @@ -22,6 +22,7 @@ import org.apache.ratis.conf.Parameters; import org.apache.ratis.grpc.client.GrpcClientRpc; import org.apache.ratis.grpc.server.GRpcLogAppender; import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.*; public class GrpcFactory implements ServerFactory, ClientFactory { @@ -39,7 +40,7 @@ public class GrpcFactory implements ServerFactory, ClientFactory { } @Override - public RaftGRpcService newRaftServerRpc(RaftServerImpl server) { + public RaftGRpcService newRaftServerRpc(RaftServer server) { return RaftGRpcService.newBuilder() .setServer(server) .build(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java index 92dd257..2d507be 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java @@ -64,7 +64,8 @@ public class GRpcLogAppender extends LogAppender { RaftGRpcService rpcService = (RaftGRpcService) server.getServerRpc(); client = rpcService.getRpcClient(f.getPeer()); - maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(server.getProperties()); + maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax( + server.getProxy().getProperties()); pendingRequests = new ConcurrentLinkedQueue<>(); appendResponseHandler = new AppendLogResponseHandler(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java index c3ca707..1cf55ad 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java @@ -23,6 +23,7 @@ import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.*; import org.apache.ratis.statemachine.StateMachine; @@ -47,7 +48,7 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { } @Override - protected RaftServerImpl newRaftServer( + protected RaftServerProxy newRaftServer( RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, RaftProperties properties) throws IOException { GrpcConfigKeys.Server.setPort(properties, getPort(id, conf)); @@ -55,7 +56,7 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { } @Override - protected void startServer(RaftServerImpl server, boolean startService) { + protected void startServer(RaftServer server, boolean startService) { final String id = server.getId().toString(); if (startService) { server.start(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java index 419bfaa..f6c4885 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java @@ -23,6 +23,7 @@ import org.apache.ratis.conf.Parameters; import org.apache.ratis.hadooprpc.client.HadoopClientRpc; import org.apache.ratis.hadooprpc.server.HadoopRpcService; import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerFactory; @@ -53,7 +54,7 @@ public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFa } @Override - public HadoopRpcService newRaftServerRpc(RaftServerImpl server) { + public HadoopRpcService newRaftServerRpc(RaftServer server) { return HadoopRpcService.newBuilder() .setServer(server) .setConf(getConf()) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java index 7c11f8e..c52c81d 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java @@ -27,7 +27,7 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; import org.apache.ratis.server.impl.RaftConfiguration; -import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.impl.ServerImplUtils; import org.apache.ratis.statemachine.StateMachine; import org.slf4j.Logger; @@ -72,7 +72,7 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { } @Override - protected RaftServerImpl newRaftServer( + protected RaftServerProxy newRaftServer( RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, RaftProperties properties) throws IOException { final Configuration hconf = new Configuration(hadoopConf); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java index 6dcfd15..31f127b 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java @@ -22,6 +22,7 @@ import org.apache.ratis.conf.Parameters; import org.apache.ratis.netty.client.NettyClientRpc; import org.apache.ratis.netty.server.NettyRpcService; import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerFactory; @@ -34,7 +35,7 @@ public class NettyFactory extends ServerFactory.BaseFactory implements ClientFac } @Override - public NettyRpcService newRaftServerRpc(RaftServerImpl server) { + public NettyRpcService newRaftServerRpc(RaftServer server) { return NettyRpcService.newBuilder().setServer(server).build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java index 126624d..02a7493 100644 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java @@ -24,10 +24,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.netty.server.NettyRpcService; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.server.impl.DelayLocalExecutionInjection; -import org.apache.ratis.server.impl.RaftConfiguration; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.ServerImplUtils; +import org.apache.ratis.server.impl.*; import org.apache.ratis.statemachine.StateMachine; import java.io.IOException; @@ -50,7 +47,7 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { } @Override - protected RaftServerImpl newRaftServer( + protected RaftServerProxy newRaftServer( RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, RaftProperties properties) throws IOException { NettyConfigKeys.Server.setPort(properties, getPort(id, conf)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 14a3780..d5d5eb0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -164,7 +164,7 @@ class LeaderElection extends Daemon { case SHUTDOWN: LOG.info("{} received shutdown response when requesting votes.", server.getId()); - server.close(); + server.getProxy().close(); return; case REJECTED: case DISCOVERED_A_NEW_TERM: http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index c4f92db..4c19b7e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -112,8 +112,7 @@ public class LeaderState { senders = new ArrayList<>(others.size()); for (RaftPeer p : others) { - FollowerInfo f = new FollowerInfo(p, t, placeHolderIndex, true); - senders.add(server.getFactory().newLogAppender(server, this, f)); + senders.add(server.newLogAppender(this, p, t, placeHolderIndex, true)); } voterLists = divideFollowers(conf); } @@ -236,8 +235,7 @@ public class LeaderState { final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); final long nextIndex = raftLog.getNextIndex(); for (RaftPeer peer : newMembers) { - FollowerInfo f = new FollowerInfo(peer, t, nextIndex, false); - LogAppender sender = server.getFactory().newLogAppender(server, this, f); + LogAppender sender = server.newLogAppender(this, peer, t, nextIndex, false); senders.add(sender); sender.start(); } @@ -450,7 +448,8 @@ public class LeaderState { } // the pending request handler will send NotLeaderException for // pending client requests when it stops - server.close(); + // TODO should close impl instead of proxy + server.getProxy().close(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index a48d236..ff071d5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -67,7 +67,7 @@ public class LogAppender extends Daemon { this.leaderState = leaderState; this.raftLog = server.getState().getLog(); - final RaftProperties properties = server.getProperties(); + final RaftProperties properties = server.getProxy().getProperties(); this.maxBufferSize = RaftServerConfigKeys.Log.Appender.bufferCapacity(properties).getSizeInt(); this.batchSending = RaftServerConfigKeys.Log.Appender.batchEnabled(properties); this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index dca8b08..4ba7a10 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -17,72 +17,39 @@ */ package org.apache.ratis.server.impl; -import static org.apache.ratis.server.impl.ServerProtoUtils.toRaftConfiguration; -import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY; -import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER; -import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS; -import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; -import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY; -import static org.apache.ratis.util.LifeCycle.State.CLOSED; -import static org.apache.ratis.util.LifeCycle.State.CLOSING; -import static org.apache.ratis.util.LifeCycle.State.RUNNING; -import static org.apache.ratis.util.LifeCycle.State.STARTING; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.OptionalLong; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; - -import org.apache.ratis.RaftConfigKeys; -import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.LeaderNotReadyException; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.NotLeaderException; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftException; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.protocol.ReconfigurationInProgressException; -import org.apache.ratis.protocol.SetConfigurationRequest; -import org.apache.ratis.protocol.StateMachineException; -import org.apache.ratis.rpc.RpcType; -import org.apache.ratis.server.RaftServer; +import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.FileInfo; import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult; -import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.shaded.com.google.common.base.Supplier; +import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; -import org.apache.ratis.util.CodeInjectionForTesting; -import org.apache.ratis.util.IOUtils; -import org.apache.ratis.util.LifeCycle; -import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ProtoUtils; -import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RaftServerImpl implements RaftServer { +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static org.apache.ratis.server.impl.ServerProtoUtils.toRaftConfiguration; +import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; +import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; +import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY; +import static org.apache.ratis.util.LifeCycle.State.*; + +public class RaftServerImpl implements RaftServerProtocol, + RaftClientProtocol, RaftClientAsynchronousProtocol { public static final Logger LOG = LoggerFactory.getLogger(RaftServerImpl.class); private static final String CLASS_NAME = RaftServerImpl.class.getSimpleName(); @@ -96,13 +63,12 @@ public class RaftServerImpl implements RaftServer { LEADER, CANDIDATE, FOLLOWER } + private final RaftServerProxy proxy; private final int minTimeoutMs; private final int maxTimeoutMs; private final LifeCycle lifeCycle; private final ServerState state; - private final StateMachine stateMachine; - private final RaftProperties properties; private volatile Role role; /** used when the peer is follower, to monitor election timeout */ @@ -114,28 +80,19 @@ public class RaftServerImpl implements RaftServer { /** used when the peer is leader */ private volatile LeaderState leaderState; - private final RaftServerRpc serverRpc; - - private final ServerFactory factory; - private final RetryCache retryCache; - RaftServerImpl(RaftPeerId id, StateMachine stateMachine, - RaftConfiguration raftConf, RaftProperties properties, Parameters parameters) + RaftServerImpl(RaftPeerId id, RaftServerProxy proxy, + RaftConfiguration raftConf, RaftProperties properties) throws IOException { this.lifeCycle = new LifeCycle(id); minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toInt(TimeUnit.MILLISECONDS); maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS); Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs, "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs); - this.properties = properties; - this.stateMachine = stateMachine; - this.state = new ServerState(id, raftConf, properties, this, stateMachine); - - final RpcType rpcType = RaftConfigKeys.Rpc.type(properties); - this.factory = ServerFactory.cast(rpcType.newFactory(properties, parameters)); - this.serverRpc = initRaftServerRpc(); - retryCache = initRetryCache(properties); + this.proxy = proxy; + this.state = new ServerState(id, raftConf, properties, this, proxy.getStateMachine()); + this.retryCache = initRetryCache(properties); } private RetryCache initRetryCache(RaftProperties prop) { @@ -144,15 +101,13 @@ public class RaftServerImpl implements RaftServer { return new RetryCache(capacity, expireTime); } - @Override - public RpcType getRpcType() { - return getFactory().getRpcType(); + LogAppender newLogAppender( + LeaderState state, RaftPeer peer, Timestamp lastRpcTime, long nextIndex, + boolean attendVote) { + final FollowerInfo f = new FollowerInfo(peer, lastRpcTime, nextIndex, attendVote); + return getProxy().getFactory().newLogAppender(this, state, f); } - @Override - public ServerFactory getFactory() { - return factory; - } int getMinTimeoutMs() { return minTimeoutMs; @@ -167,9 +122,8 @@ public class RaftServerImpl implements RaftServer { maxTimeoutMs - minTimeoutMs + 1); } - @Override - public StateMachine getStateMachine() { - return this.stateMachine; + StateMachine getStateMachine() { + return proxy.getStateMachine(); } @VisibleForTesting @@ -177,22 +131,15 @@ public class RaftServerImpl implements RaftServer { return retryCache; } - private RaftServerRpc initRaftServerRpc() { - final RaftServerRpc rpc = getFactory().newRaftServerRpc(this); - // add peers into rpc service - RaftConfiguration conf = getRaftConf(); - if (conf != null) { - rpc.addPeers(conf.getPeers()); - } - return rpc; + public RaftServerProxy getProxy() { + return proxy; } public RaftServerRpc getServerRpc() { - return serverRpc; + return proxy.getServerRpc(); } - @Override - public void start() { + void start() { lifeCycle.transition(STARTING); state.start(); RaftConfiguration conf = getRaftConf(); @@ -213,7 +160,6 @@ public class RaftServerImpl implements RaftServer { heartbeatMonitor = new FollowerState(this); heartbeatMonitor.start(); - getServerRpc().start(); lifeCycle.transition(RUNNING); } @@ -225,14 +171,12 @@ public class RaftServerImpl implements RaftServer { private void startInitializing() { role = Role.FOLLOWER; // do not start heartbeatMonitoring - getServerRpc().start(); } public ServerState getState() { - return this.state; + return state; } - @Override public RaftPeerId getId() { return getState().getSelfId(); } @@ -241,18 +185,27 @@ public class RaftServerImpl implements RaftServer { return getState().getRaftConf(); } - @Override - public void close() { + void shutdown() { lifeCycle.checkStateAndClose(() -> { try { shutdownHeartbeatMonitor(); + } catch (Exception ignored) { + LOG.warn("Failed to shutdown heartbeat monitor for " + getId(), ignored); + } + try{ shutdownElectionDaemon(); + } catch (Exception ignored) { + LOG.warn("Failed to shutdown election daemon for " + getId(), ignored); + } + try{ shutdownLeaderState(); - - getServerRpc().close(); + } catch (Exception ignored) { + LOG.warn("Failed to shutdown leader state monitor for " + getId(), ignored); + } + try{ state.close(); } catch (Exception ignored) { - LOG.warn("Failed to kill " + state.getSelfId(), ignored); + LOG.warn("Failed to close state for " + getId(), ignored); } }); } @@ -335,7 +288,7 @@ public class RaftServerImpl implements RaftServer { role = Role.LEADER; state.becomeLeader(); // start sending AppendEntries RPC to followers - leaderState = new LeaderState(this, properties); + leaderState = new LeaderState(this, getProxy().getProperties()); leaderState.start(); } @@ -399,8 +352,7 @@ public class RaftServerImpl implements RaftServer { if (leaderId == null || leaderId.equals(state.getSelfId())) { // No idea about who is the current leader. Or the peer is the current // leader, but it is about to step down - RaftPeer suggestedLeader = state.getRaftConf() - .getRandomPeer(state.getSelfId()); + RaftPeer suggestedLeader = getRaftConf().getRandomPeer(state.getSelfId()); leaderId = suggestedLeader == null ? null : suggestedLeader.getId(); } RaftConfiguration conf = getRaftConf(); @@ -456,6 +408,7 @@ public class RaftServerImpl implements RaftServer { } // let the state machine handle read-only request from client + final StateMachine stateMachine = getStateMachine(); if (request.isReadOnly()) { // TODO: We might not be the leader anymore by the time this completes. // See the RAFT paper section 8 (last part) @@ -680,17 +633,29 @@ public class RaftServerImpl implements RaftServer { entries); } + static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) { + if (isHeartbeat) { + if (LOG.isTraceEnabled()) { + LOG.trace(message.get()); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(message.get()); + } + } + } + private AppendEntriesReplyProto appendEntries(RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, boolean initializing, LogEntryProto... entries) throws IOException { CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), - leaderId, leaderTerm, previous, leaderCommit, initializing, - entries); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(), - leaderId, leaderTerm, previous, leaderCommit, initializing, - ServerProtoUtils.toString(entries)); - } + leaderId, leaderTerm, previous, leaderCommit, initializing, entries); + final boolean isHeartbeat = entries.length == 0; + logAppendEntries(isHeartbeat, + () -> getId() + ": receive appendEntries(" + leaderId + ", " + + leaderTerm + ", " + previous + ", " + leaderCommit + ", " + + initializing + ServerProtoUtils.toString(entries)); + lifeCycle.assertCurrentState(STARTING, RUNNING); try { @@ -734,8 +699,10 @@ public class RaftServerImpl implements RaftServer { final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY); - LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}", - getId(), previous, ServerProtoUtils.toString(reply)); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}", + getId(), previous, ServerProtoUtils.toString(reply)); + } return reply; } @@ -761,13 +728,14 @@ public class RaftServerImpl implements RaftServer { } final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( leaderId, getId(), currentTerm, nextIndex, SUCCESS); - LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(), - ServerProtoUtils.toString(reply)); + logAppendEntries(isHeartbeat, + () -> getId() + ": succeeded to handle AppendEntries. Reply: " + + ServerProtoUtils.toString(reply)); return reply; } private boolean containPrevious(TermIndex previous) { - LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}", + LOG.trace("{}: prev:{}, latestSnapshot:{}, latestInstalledSnapshot:{}", getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot()); return state.getLog().contains(previous) || (state.getLatestSnapshot() != null @@ -922,6 +890,7 @@ public class RaftServerImpl implements RaftServer { } public void applyLogToStateMachine(LogEntryProto next) { + final StateMachine stateMachine = getStateMachine(); if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { // the reply should have already been set. only need to record // the new conf in the state machine. @@ -943,9 +912,4 @@ public class RaftServerImpl implements RaftServer { replyPendingRequest(next, stateMachineFuture); } } - - @Override - public RaftProperties getProperties() { - return this.properties; - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java new file mode 100644 index 0000000..36adf11 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.conf.Parameters; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.*; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.shaded.proto.RaftProtos.*; +import org.apache.ratis.statemachine.StateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +public class RaftServerProxy implements RaftServer { + public static final Logger LOG = LoggerFactory.getLogger(RaftServerProxy.class); + + private final RaftPeerId id; + private final RaftServerImpl impl; + private final StateMachine stateMachine; + private final RaftProperties properties; + + private final RaftServerRpc serverRpc; + private final ServerFactory factory; + + RaftServerProxy(RaftPeerId id, StateMachine stateMachine, + RaftConfiguration raftConf, RaftProperties properties, Parameters parameters) + throws IOException { + this.id = id; + this.properties = properties; + this.stateMachine = stateMachine; + + final RpcType rpcType = RaftConfigKeys.Rpc.type(properties); + this.factory = ServerFactory.cast(rpcType.newFactory(properties, parameters)); + this.impl = new RaftServerImpl(id, this, raftConf, properties); + this.serverRpc = initRaftServerRpc(factory, this, raftConf); + } + + private static RaftServerRpc initRaftServerRpc( + ServerFactory factory, RaftServer server, RaftConfiguration raftConf) { + final RaftServerRpc rpc = factory.newRaftServerRpc(server); + // add peers into rpc service + if (raftConf != null) { + rpc.addPeers(raftConf.getPeers()); + } + return rpc; + } + + @Override + public RpcType getRpcType() { + return getFactory().getRpcType(); + } + + @Override + public ServerFactory getFactory() { + return factory; + } + + @Override + public StateMachine getStateMachine() { + return stateMachine; + } + + @Override + public RaftProperties getProperties() { + return properties; + } + + public RaftServerRpc getServerRpc() { + return serverRpc; + } + + public RaftServerImpl getImpl() { + return impl; + } + + @Override + public void start() { + getImpl().start(); + getServerRpc().start(); + } + + @Override + public RaftPeerId getId() { + return id; + } + + @Override + public void close() { + getImpl().shutdown(); + try { + getServerRpc().close(); + } catch (IOException ignored) { + LOG.warn("Failed to close RPC server for " + getId(), ignored); + } + } + + @Override + public CompletableFuture<RaftClientReply> submitClientRequestAsync( + RaftClientRequest request) throws IOException { + return getImpl().submitClientRequestAsync(request); + } + + @Override + public RaftClientReply submitClientRequest(RaftClientRequest request) + throws IOException { + return getImpl().submitClientRequest(request); + } + + @Override + public RaftClientReply setConfiguration(SetConfigurationRequest request) + throws IOException { + return getImpl().setConfiguration(request); + } + + /** + * Handle a raft configuration change request from client. + */ + @Override + public CompletableFuture<RaftClientReply> setConfigurationAsync( + SetConfigurationRequest request) throws IOException { + return getImpl().setConfigurationAsync(request); + } + + @Override + public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) + throws IOException { + return getImpl().requestVote(r); + } + + @Override + public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) + throws IOException { + return getImpl().appendEntries(r); + } + + @Override + public InstallSnapshotReplyProto installSnapshot( + InstallSnapshotRequestProto request) throws IOException { + return getImpl().installSnapshot(request); + } + + @Override + public String toString() { + return getClass().getSimpleName() + ":" + getId().toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java index e7fb3cc..e0149f2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.impl; import org.apache.ratis.rpc.RpcFactory; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; /** A factory interface for creating server components. */ @@ -34,7 +35,7 @@ public interface ServerFactory extends RpcFactory { /** Create a new {@link LogAppender}. */ LogAppender newLogAppender(RaftServerImpl server, LeaderState state, FollowerInfo f); - RaftServerRpc newRaftServerRpc(RaftServerImpl server); + RaftServerRpc newRaftServerRpc(RaftServer server); abstract class BaseFactory implements ServerFactory { @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index d764999..7934bbb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -37,10 +37,10 @@ public class ServerImplUtils { properties, parameters); } - public static RaftServerImpl newRaftServer( + public static RaftServerProxy newRaftServer( RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, RaftProperties properties, Parameters parameters) throws IOException { - return new RaftServerImpl(id, stateMachine, conf, properties, parameters); + return new RaftServerProxy(id, stateMachine, conf, properties, parameters); } public static TermIndex newTermIndex(long term, long index) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 8c4a7c6..577560f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -17,19 +17,6 @@ */ package org.apache.ratis; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - import org.apache.ratis.client.ClientFactory; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.Parameters; @@ -37,25 +24,28 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.DelayLocalExecutionInjection; -import org.apache.ratis.server.impl.LeaderState; -import org.apache.ratis.server.impl.RaftConfiguration; -import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.*; import org.apache.ratis.server.storage.MemoryRaftLog; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.statemachine.BaseStateMachine; import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.util.ExitUtils; -import org.apache.ratis.util.FileUtils; -import org.apache.ratis.util.CollectionUtils; -import org.apache.ratis.util.NetUtils; -import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ReflectionUtils; +import org.apache.ratis.util.*; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + public abstract class MiniRaftCluster { public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class); public static final DelayLocalExecutionInjection logSyncDelay = @@ -107,13 +97,9 @@ public abstract class MiniRaftCluster { } } - public static RaftConfiguration initConfiguration(int numServers) { - return initConfiguration(generateIds(numServers, 0)); - } - - public static RaftConfiguration initConfiguration(String[] ids) { + public static RaftConfiguration initConfiguration(Collection<String> ids) { return RaftConfiguration.newBuilder() - .setConf(Arrays.stream(ids) + .setConf(ids.stream() .map(id -> new RaftPeerId(id)) .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())) .collect(Collectors.toList())) @@ -144,10 +130,10 @@ public abstract class MiniRaftCluster { protected final RaftProperties properties; protected final Parameters parameters; private final String testBaseDir; - protected final Map<RaftPeerId, RaftServerImpl> servers = new ConcurrentHashMap<>(); + protected final Map<RaftPeerId, RaftServerProxy> servers = new ConcurrentHashMap<>(); protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) { - this.conf = initConfiguration(ids); + this.conf = initConfiguration(Arrays.asList(ids)); this.properties = new RaftProperties(properties); this.parameters = parameters; @@ -166,13 +152,17 @@ public abstract class MiniRaftCluster { return this; } - private RaftServerImpl putNewServer(RaftPeerId id, boolean format) { - final RaftServerImpl s = newRaftServer(id, format); + private RaftServerProxy putNewServer(RaftPeerId id, boolean format) { + return putNewServer(id, conf, format); + } + + public RaftServerProxy putNewServer(RaftPeerId id, RaftConfiguration raftConf, boolean format) { + final RaftServerProxy s = newRaftServer(id, raftConf, format); Preconditions.assertTrue(servers.put(id, s) == null); return s; } - private Collection<RaftServerImpl> putNewServers( + private Collection<RaftServerProxy> putNewServers( Iterable<RaftPeerId> peers, boolean format) { return StreamSupport.stream(peers.spliterator(), false) .map(id -> putNewServer(id, format)) @@ -182,14 +172,13 @@ public abstract class MiniRaftCluster { public void start() { LOG.info("Starting " + getClass().getSimpleName()); initServers(); - servers.values().forEach(RaftServerImpl::start); + servers.values().forEach(RaftServer::start); } /** * start a stopped server again. */ - public void restartServer(String id, boolean format) throws IOException { - final RaftPeerId newId = new RaftPeerId(id); + public void restartServer(RaftPeerId newId, boolean format) throws IOException { killServer(newId); servers.remove(newId); @@ -197,8 +186,8 @@ public abstract class MiniRaftCluster { } public void restart(boolean format) throws IOException { - servers.values().stream().filter(RaftServerImpl::isAlive) - .forEach(RaftServerImpl::close); + shutdown(); + List<RaftPeerId> idList = new ArrayList<>(servers.keySet()); servers.clear(); putNewServers(idList, format); @@ -213,7 +202,7 @@ public abstract class MiniRaftCluster { return conf; } - private RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { + private RaftServerProxy newRaftServer(RaftPeerId id, RaftConfiguration raftConf, boolean format) { try { final String dirStr = testBaseDir + id; if (format) { @@ -222,13 +211,13 @@ public abstract class MiniRaftCluster { final RaftProperties prop = new RaftProperties(properties); RaftServerConfigKeys.setStorageDir(prop, dirStr); final StateMachine stateMachine = getStateMachine4Test(properties); - return newRaftServer(id, stateMachine, conf, prop); + return newRaftServer(id, stateMachine, raftConf, prop); } catch (IOException e) { throw new RuntimeException(e); } } - protected abstract RaftServerImpl newRaftServer( + protected abstract RaftServerProxy newRaftServer( RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, RaftProperties properties) throws IOException; @@ -241,13 +230,17 @@ public abstract class MiniRaftCluster { } public static Collection<RaftPeer> toRaftPeers( - Collection<RaftServerImpl> servers) { + Collection<RaftServerProxy> servers) { return servers.stream() .map(MiniRaftCluster::toRaftPeer) .collect(Collectors.toList()); } public static RaftPeer toRaftPeer(RaftServerImpl s) { + return toRaftPeer(s.getProxy()); + } + + public static RaftPeer toRaftPeer(RaftServerProxy s) { return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()); } @@ -261,7 +254,7 @@ public abstract class MiniRaftCluster { LOG.info("Add new peers {}", Arrays.asList(ids)); // create and add new RaftServers - final Collection<RaftServerImpl> newServers = putNewServers( + final Collection<RaftServerProxy> newServers = putNewServers( CollectionUtils.as(Arrays.asList(ids), RaftPeerId::new), true); newServers.forEach(s -> startServer(s, startNewPeer)); @@ -273,7 +266,7 @@ public abstract class MiniRaftCluster { return new PeerChanges(p, np, new RaftPeer[0]); } - protected void startServer(RaftServerImpl server, boolean startService) { + protected void startServer(RaftServer server, boolean startService) { if (startService) { server.start(); } @@ -318,7 +311,7 @@ public abstract class MiniRaftCluster { public String printServers() { StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n"); - for (RaftServerImpl s : servers.values()) { + for (RaftServer s : servers.values()) { b.append(" "); b.append(s).append("\n"); } @@ -327,7 +320,7 @@ public abstract class MiniRaftCluster { public String printAllLogs() { StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n"); - for (RaftServerImpl s : servers.values()) { + for (RaftServerImpl s : iterateServerImpls()) { b.append(" "); b.append(s).append("\n"); @@ -342,8 +335,8 @@ public abstract class MiniRaftCluster { public RaftServerImpl getLeader() { final List<RaftServerImpl> leaders = new ArrayList<>(); - servers.values().stream() - .filter(s -> s.isAlive() && s.isLeader()) + getServersAliveStream() + .filter(RaftServerImpl::isLeader) .forEach(s -> { if (leaders.isEmpty()) { leaders.add(s); @@ -373,20 +366,26 @@ public abstract class MiniRaftCluster { } public List<RaftServerImpl> getFollowers() { - return servers.values().stream() - .filter(s -> s.isAlive() && s.isFollower()) + return getServersAliveStream() + .filter(RaftServerImpl::isFollower) .collect(Collectors.toList()); } - public Collection<RaftServerImpl> getServers() { + public Collection<RaftServerProxy> getServers() { return servers.values(); } - public RaftServerImpl getServer(String id) { - return getServer(new RaftPeerId(id)); + public Iterable<RaftServerImpl> iterateServerImpls() { + return CollectionUtils.as(getServers(), RaftServerProxy::getImpl); + } + + public Stream<RaftServerImpl> getServersAliveStream() { + return getServers().stream() + .map(RaftServerProxy::getImpl) + .filter(RaftServerImpl::isAlive); } - public RaftServerImpl getServer(RaftPeerId id) { + public RaftServerProxy getServer(RaftPeerId id) { return servers.get(id); } @@ -395,8 +394,12 @@ public abstract class MiniRaftCluster { } public RaftClient createClient(RaftPeerId leaderId) { + return createClient(leaderId, conf.getPeers()); + } + + public RaftClient createClient(RaftPeerId leaderId, Collection<RaftPeer> servers) { return RaftClient.newBuilder() - .setServers(conf.getPeers()) + .setServers(servers) .setLeaderId(leaderId) .setClientRpc(clientFactory.newRaftClientRpc()) .setProperties(properties) @@ -405,8 +408,7 @@ public abstract class MiniRaftCluster { public void shutdown() { LOG.info("Stopping " + getClass().getSimpleName()); - servers.values().stream().filter(RaftServerImpl::isAlive) - .forEach(RaftServerImpl::close); + getServersAliveStream().map(RaftServerImpl::getProxy).forEach(RaftServerProxy::close); if (ExitUtils.isTerminated()) { LOG.error("Test resulted in an unexpected exit", http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 1e41944..9c69d03 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -97,7 +97,7 @@ public abstract class RaftBasicTests { Thread.sleep(cluster.getMaxTimeout() + 100); LOG.info(cluster.printAllLogs()); - cluster.getServers().stream().filter(RaftServerImpl::isAlive) + cluster.getServersAliveStream() .map(s -> s.getState().getLog()) .forEach(log -> RaftTestUtil.assertLogEntries(log, log.getEntries(1, Long.MAX_VALUE), 1, term, messages)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java index 66c54cb..a42d1bd 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java @@ -103,7 +103,7 @@ public abstract class RaftRetryCacheTests { long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex(); // make sure retry cache has the entry - for (RaftServerImpl server : cluster.getServers()) { + for (RaftServerImpl server : cluster.iterateServerImpls()) { LOG.info("check server " + server.getId()); if (server.getState().getLastAppliedIndex() < leaderApplied) { Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 0680df9..b83e1f2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -19,10 +19,12 @@ package org.apache.ratis; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.shaded.com.google.protobuf.ByteString; @@ -63,8 +65,8 @@ public class RaftTestUtil { return leader; } - public static RaftServerImpl waitForLeader(MiniRaftCluster cluster, - final String leaderId) throws InterruptedException { + public static RaftServerImpl waitForLeader( + MiniRaftCluster cluster, final String leaderId) throws InterruptedException { LOG.info(cluster.printServers()); for(int i = 0; !cluster.tryEnforceLeader(leaderId) && i < 10; i++) { RaftServerImpl currLeader = cluster.getLeader(); @@ -114,10 +116,11 @@ public class RaftTestUtil { return idxExpected == expectedMessages.length; } - public static void assertLogEntries(Collection<RaftServerImpl> servers, + public static void assertLogEntries(Collection<RaftServerProxy> servers, SimpleMessage... expectedMessages) { final int size = servers.size(); final long count = servers.stream() + .map(RaftServerProxy::getImpl) .filter(RaftServerImpl::isAlive) .map(s -> s.getState().getLog()) .filter(log -> logEntriesContains(log, expectedMessages)) @@ -282,7 +285,8 @@ public class RaftTestUtil { return newLeader; } - public static void blockQueueAndSetDelay(Collection<RaftServerImpl> servers, + public static <SERVER extends RaftServer> void blockQueueAndSetDelay( + Collection<SERVER> servers, DelayLocalExecutionInjection injection, String leaderId, int delayMs, long maxTimeout) throws InterruptedException { // block reqeusts sent to leader if delayMs > 0 http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 524405a..13801a4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -232,7 +232,7 @@ public abstract class RaftReconfigurationBaseTest { // check configuration manager's internal state // each reconf will generate two configurations: (old, new) and (new) - cluster.getServers().stream().filter(RaftServerImpl::isAlive) + cluster.getServersAliveStream() .forEach(server -> { ConfigurationManager confManager = (ConfigurationManager) Whitebox.getInternalState(server.getState(), @@ -338,7 +338,8 @@ public abstract class RaftReconfigurationBaseTest { final RaftLog leaderLog = cluster.getLeader().getState().getLog(); for (RaftPeer newPeer : c1.newPeers) { Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE), - cluster.getServer(newPeer.getId().toString()).getState().getLog() + cluster.getServer(newPeer.getId()) + .getImpl().getState().getLog() .getEntries(0, Long.MAX_VALUE)); } } finally { @@ -524,9 +525,10 @@ public abstract class RaftReconfigurationBaseTest { cluster.start(); RaftTestUtil.waitForLeader(cluster); - final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftServerImpl leader = cluster.getLeader(); + final RaftPeerId leaderId = leader.getId(); - final RaftLog log = cluster.getServer(leaderId.toString()).getState().getLog(); + final RaftLog log = leader.getState().getLog(); Thread.sleep(1000); Assert.assertEquals(0, log.getLatestFlushedIndex()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index 0762f21..62c68bf 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -47,7 +47,7 @@ public class RaftServerTestUtil { int deadIncluded = 0; final RaftConfiguration current = RaftConfiguration.newBuilder() .setConf(peers).setLogEntryIndex(0).build(); - for (RaftServerImpl server : cluster.getServers()) { + for (RaftServerImpl server : cluster.iterateServerImpls()) { if (deadPeers != null && deadPeers.contains(server.getId().toString())) { if (current.containsInConf(server.getId())) { deadIncluded++; @@ -66,11 +66,6 @@ public class RaftServerTestUtil { Assert.assertEquals(peers.length, numIncluded + deadIncluded); } - public static ConfigurationManager newConfigurationManager( - RaftConfiguration initialConf) { - return new ConfigurationManager(initialConf); - } - public static long getRetryCacheSize(RaftServerImpl server) { return server.getRetryCache().size(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java index a307101..1682b20 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java @@ -24,6 +24,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.impl.ServerImplUtils; import org.apache.ratis.statemachine.StateMachine; import org.slf4j.Logger; @@ -80,7 +81,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { } @Override - protected RaftServerImpl newRaftServer( + protected RaftServerProxy newRaftServer( RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, RaftProperties properties) throws IOException { serverRequestReply.addPeer(id); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java index 69707aa..66004c6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java @@ -21,6 +21,7 @@ import org.apache.ratis.client.ClientFactory; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerFactory; @@ -63,7 +64,7 @@ class SimulatedRpc implements RpcType { } @Override - public SimulatedServerRpc newRaftServerRpc(RaftServerImpl server) { + public SimulatedServerRpc newRaftServerRpc(RaftServer server) { return new SimulatedServerRpc(server, Objects.requireNonNull(serverRequestReply), Objects.requireNonNull(client2serverRequestReply)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java index 562dba5..9551da8 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java @@ -21,8 +21,10 @@ import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.util.Daemon; import org.slf4j.Logger; @@ -43,10 +45,11 @@ class SimulatedServerRpc implements RaftServerRpc { private final RequestHandler<RaftClientRequest, RaftClientReply> clientHandler; private final ExecutorService executor = Executors.newFixedThreadPool(3, Daemon::new); - SimulatedServerRpc(RaftServerImpl server, + SimulatedServerRpc(RaftServer server, SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, SimulatedRequestReply<RaftClientRequest, RaftClientReply> clientRequestReply) { - this.server = server; + this.server = server instanceof RaftServerProxy? + ((RaftServerProxy)server).getImpl(): (RaftServerImpl)server; this.serverHandler = new RequestHandler<>(server.getId().toString(), "serverHandler", serverRequestReply, serverHandlerImpl, 3); this.clientHandler = new RequestHandler<>(server.getId().toString(), http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java index f5e2378..cf2edff 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -62,7 +62,7 @@ public abstract class RaftSnapshotBaseTest { static File getSnapshotFile(MiniRaftCluster cluster, int i) { final RaftServerImpl leader = cluster.getLeader(); - final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader); + final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader.getProxy()); return sm.getStateMachineStorage().getSnapshotFile( leader.getState().getCurrentTerm(), i); } @@ -72,7 +72,7 @@ public abstract class RaftSnapshotBaseTest { final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster); Assert.assertEquals(SNAPSHOT_TRIGGER_THRESHOLD * 2, leader.getState().getLog().getLastCommittedIndex()); - final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent(); + final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader.getProxy()).getContent(); for (int i = 1; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { Assert.assertEquals(i+1, entries[i].getIndex()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index 6cb0234..4ecc282 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -24,8 +24,8 @@ import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.RaftServerConstants; -import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.LogInputStream; @@ -58,7 +58,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { = "raft.test.simple.state.machine.take.snapshot"; private static final boolean RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT = false; - public static SimpleStateMachine4Testing get(RaftServerImpl s) { + public static SimpleStateMachine4Testing get(RaftServer s) { return (SimpleStateMachine4Testing)s.getStateMachine(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/341456e0/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java index c1f79a3..cf4880c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java @@ -24,8 +24,10 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.util.LogUtils; @@ -96,7 +98,7 @@ public class TestStateMachine { } static class SMTransactionContext extends SimpleStateMachine4Testing { - public static SMTransactionContext get(RaftServerImpl s) { + public static SMTransactionContext get(RaftServer s) { return (SMTransactionContext)s.getStateMachine(); } @@ -167,7 +169,7 @@ public class TestStateMachine { // TODO: there eshould be a better way to ensure all data is replicated and applied Thread.sleep(cluster.getMaxTimeout() + 100); - for (RaftServerImpl raftServer : cluster.getServers()) { + for (RaftServerProxy raftServer : cluster.getServers()) { final SMTransactionContext sm = SMTransactionContext.get(raftServer); sm.rethrowIfException(); assertEquals(numTrx, sm.numApplied.get()); @@ -176,7 +178,7 @@ public class TestStateMachine { // check leader RaftServerImpl raftServer = cluster.getLeader(); // assert every transaction has obtained context in leader - final SMTransactionContext sm = SMTransactionContext.get(raftServer); + final SMTransactionContext sm = SMTransactionContext.get(raftServer.getProxy()); List<Long> ll = sm.applied.stream().collect(Collectors.toList()); Collections.sort(ll); assertEquals(ll.toString(), ll.size(), numTrx);
