Repository: incubator-ratis Updated Branches: refs/heads/master 038ef310c -> c3407a220
RATIS-240. Move stateMachine from RaftServerProxy to RaftServerImpl. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c3407a22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c3407a22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c3407a22 Branch: refs/heads/master Commit: c3407a2206e5f9acac20efbc6998ae19164df5ce Parents: 038ef31 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Thu Jun 7 21:23:39 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Thu Jun 7 21:23:39 2018 +0800 ---------------------------------------------------------------------- .../org/apache/ratis/grpc/TestRaftServerWithGrpc.java | 12 ++++++------ .../main/java/org/apache/ratis/server/RaftServer.java | 6 ------ .../org/apache/ratis/server/impl/RaftServerImpl.java | 14 +++++++++----- .../org/apache/ratis/server/impl/RaftServerProxy.java | 13 ++++--------- .../org/apache/ratis/server/impl/ServerImplUtils.java | 9 ++++++++- .../org/apache/ratis/statemachine/StateMachine.java | 6 ++++++ .../test/java/org/apache/ratis/RaftAsyncTests.java | 4 ++-- .../apache/ratis/server/impl/RaftServerTestUtil.java | 11 ----------- .../ratis/statemachine/RaftSnapshotBaseTest.java | 4 ++-- .../statemachine/SimpleStateMachine4Testing.java | 4 ++-- .../apache/ratis/statemachine/TestStateMachine.java | 7 +++---- 11 files changed, 42 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java index cc8c8fb..e5e95ee 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java @@ -21,7 +21,8 @@ import org.apache.ratis.BaseTest; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.impl.RaftServerTestUtil; +import org.apache.ratis.server.impl.ServerImplUtils; +import org.apache.ratis.statemachine.StateMachine; import org.junit.Test; import java.io.IOException; @@ -40,8 +41,8 @@ public class TestRaftServerWithGrpc extends BaseTest { // Create a raft server proxy with server rpc bound to a different address // compared to leader. This helps in locking the raft storage directory to // be used by next raft server proxy instance. - RaftServerTestUtil.getRaftServerProxy(leaderId, cluster.getLeader().getStateMachine(), cluster.getGroup(), - new RaftProperties(), null); + final StateMachine stateMachine = cluster.getLeader().getStateMachine(); + ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), stateMachine, properties, null); // Close the server rpc for leader so that new raft server can be bound to it. cluster.getLeader().getServerRpc().close(); @@ -50,9 +51,8 @@ public class TestRaftServerWithGrpc extends BaseTest { // the raft server proxy created earlier. Raft server proxy should close // the rpc server on failure. testFailureCase("start a new server with the same address", - () -> RaftServerTestUtil.getRaftServerProxy(leaderId, cluster.getLeader().getStateMachine(), - cluster.getGroup(), properties, null), - IOException.class, OverlappingFileLockException.class); + () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), stateMachine, properties, null), + IOException.class, IOException.class, OverlappingFileLockException.class); // Try to start a raft server rpc at the leader address. cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId)); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index 085f2d1..e563705 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -48,12 +48,6 @@ public interface RaftServer extends Closeable, RpcType.Get, /** Start this server. */ void start(); - /** - * Returns the StateMachine instance. - * @return the StateMachine instance. - */ - StateMachine getStateMachine(); - /** @return a {@link Builder}. */ static Builder newBuilder() { return new Builder(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 93666df..5032e88 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -67,6 +67,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } private final RaftServerProxy proxy; + private final StateMachine stateMachine; private final int minTimeoutMs; private final int maxTimeoutMs; @@ -90,18 +91,21 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou private final RaftServerJmxAdapter jmxAdapter; - RaftServerImpl(RaftPeerId id, RaftGroup group, RaftServerProxy proxy, - RaftProperties properties) throws IOException { - LOG.debug("new RaftServerImpl {}, {}", id , group); + RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException { + final RaftPeerId id = proxy.getId(); + LOG.debug("{}: new RaftServerImpl for {}", id, group); this.groupId = group.getGroupId(); this.lifeCycle = new LifeCycle(id); + this.stateMachine = stateMachine; + + final RaftProperties properties = proxy.getProperties(); minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toInt(TimeUnit.MILLISECONDS); maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS); Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs, "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs); this.proxy = proxy; - this.state = new ServerState(id, group, properties, this, proxy.getStateMachine()); + this.state = new ServerState(id, group, properties, this, stateMachine); this.retryCache = initRetryCache(properties); this.jmxAdapter = new RaftServerJmxAdapter(); @@ -142,7 +146,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } public StateMachine getStateMachine() { - return proxy.getStateMachine(); + return stateMachine; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 5f1717a..92ed370 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -42,8 +42,8 @@ public class RaftServerProxy implements RaftServer { public static final Logger LOG = LoggerFactory.getLogger(RaftServerProxy.class); private final RaftPeerId id; - private final StateMachine stateMachine; private final RaftProperties properties; + private final StateMachine.Registry stateMachineRegistry; private final RaftServerRpc serverRpc; private final ServerFactory factory; @@ -51,11 +51,11 @@ public class RaftServerProxy implements RaftServer { private volatile CompletableFuture<RaftServerImpl> impl; private final AtomicReference<ReinitializeRequest> reinitializeRequest = new AtomicReference<>(); - RaftServerProxy(RaftPeerId id, StateMachine stateMachine, + RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftGroup group, RaftProperties properties, Parameters parameters) throws IOException { this.properties = properties; - this.stateMachine = stateMachine; + this.stateMachineRegistry = stateMachineRegistry; final RpcType rpcType = RaftConfigKeys.Rpc.type(properties); this.factory = ServerFactory.cast(rpcType.newFactory(parameters)); @@ -78,7 +78,7 @@ public class RaftServerProxy implements RaftServer { } private RaftServerImpl initImpl(RaftGroup group) throws IOException { - return new RaftServerImpl(id, group, this, properties); + return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this); } private static String getIdStringFrom(RaftServerRpc rpc) { @@ -108,11 +108,6 @@ public class RaftServerProxy implements RaftServer { } @Override - public StateMachine getStateMachine() { - return stateMachine; - } - - @Override public RaftProperties getProperties() { return properties; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index 15ee155..d9e0ee9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -30,14 +30,21 @@ import java.io.IOException; /** Server utilities for internal use. */ public class ServerImplUtils { + /** For the case that all {@link RaftServerImpl} objects share the same {@link StateMachine}. */ public static RaftServerProxy newRaftServer( RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftProperties properties, Parameters parameters) throws IOException { + return newRaftServer(id, group, gid -> stateMachine, properties, parameters); + } + + public static RaftServerProxy newRaftServer( + RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry, + RaftProperties properties, Parameters parameters) throws IOException { final RaftServerProxy proxy; try { // attempt multiple times to avoid temporary bind exception proxy = JavaUtils.attempt( - () -> new RaftServerProxy(id, stateMachine, group, properties, parameters), + () -> new RaftServerProxy(id, stateMachineRegistry, group, properties, parameters), 5, 500L, "new RaftServerProxy", RaftServerProxy.LOG); } catch (InterruptedException e) { throw IOUtils.toInterruptedIOException( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 7d35796..8fa3c90 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -20,6 +20,7 @@ package org.apache.ratis.statemachine; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftConfiguration; @@ -34,6 +35,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; /** * StateMachine is the entry point for the custom implementation of replicated state as defined in @@ -43,6 +45,10 @@ import java.util.concurrent.CompletableFuture; public interface StateMachine extends Closeable { Logger LOG = LoggerFactory.getLogger(StateMachine.class); + /** A registry to support different state machines in multi-raft environment. */ + interface Registry extends Function<RaftGroupId, StateMachine> { + } + /** * Initializes the State Machine with the given properties and storage. The state machine is * responsible reading the latest snapshot from the file system (if any) and initialize itself http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index a343f76..53e0f1f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -103,7 +103,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba final RaftClient client = cluster.createClient(); //Set blockTransaction flag so that transaction blocks for (RaftServerProxy server : cluster.getServers()) { - ((SimpleStateMachine4Testing) server.getStateMachine()).setBlockTransaction(true); + ((SimpleStateMachine4Testing) server.getImpl().getStateMachine()).setBlockTransaction(true); } //Send numMessages which are blocked and do not release the client semaphore permits @@ -133,7 +133,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba //Unset the blockTransaction flag so that semaphore permits can be released for (RaftServerProxy server : cluster.getServers()) { - ((SimpleStateMachine4Testing) server.getStateMachine()).setBlockTransaction(false); + ((SimpleStateMachine4Testing) server.getImpl().getStateMachine()).setBlockTransaction(false); } for(int i=0; i<=numMessages; i++){ futures[i].join(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index f84576d..a72e6f5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -18,19 +18,13 @@ package org.apache.ratis.server.impl; import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.conf.Parameters; -import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.JavaUtils; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Collection; import java.util.stream.Stream; @@ -86,11 +80,6 @@ public class RaftServerTestUtil { return entry.isFailed(); } - public static RaftServerProxy getRaftServerProxy(RaftPeerId id, StateMachine stateMachine, - RaftGroup group, RaftProperties properties, Parameters parameters) throws IOException { - return new RaftServerProxy(id, stateMachine, group, properties, parameters); - } - public static Stream<LogAppender> getLogAppenders(RaftServerImpl server) { return server.getLeaderState().getLogAppenders(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java index 387319d..ef018e5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -57,7 +57,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { static File getSnapshotFile(MiniRaftCluster cluster, int i) { final RaftServerImpl leader = cluster.getLeader(); - final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader.getProxy()); + final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader); return sm.getStateMachineStorage().getSnapshotFile( leader.getState().getCurrentTerm(), i); } @@ -67,7 +67,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster); Assert.assertEquals(SNAPSHOT_TRIGGER_THRESHOLD * 2, leader.getState().getLog().getLastCommittedIndex()); - final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader.getProxy()).getContent(); + final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent(); for (int i = 1; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { Assert.assertEquals(i+1, entries[i].getIndex()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index 91643db..5d66ca3 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -24,9 +24,9 @@ import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.StateMachineException; -import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.LogInputStream; import org.apache.ratis.server.storage.LogOutputStream; @@ -61,7 +61,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { = "raft.test.simple.state.machine.take.snapshot"; private static final boolean RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT = false; - public static SimpleStateMachine4Testing get(RaftServer s) { + public static SimpleStateMachine4Testing get(RaftServerImpl s) { return (SimpleStateMachine4Testing)s.getStateMachine(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java index 611435a..a045ecd 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java @@ -25,7 +25,6 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; @@ -95,7 +94,7 @@ public class TestStateMachine extends BaseTest { } static class SMTransactionContext extends SimpleStateMachine4Testing { - public static SMTransactionContext get(RaftServer s) { + public static SMTransactionContext get(RaftServerImpl s) { return (SMTransactionContext)s.getStateMachine(); } @@ -167,7 +166,7 @@ public class TestStateMachine extends BaseTest { Thread.sleep(cluster.getMaxTimeout() + 100); for (RaftServerProxy raftServer : cluster.getServers()) { - final SMTransactionContext sm = SMTransactionContext.get(raftServer); + final SMTransactionContext sm = SMTransactionContext.get(raftServer.getImpl()); sm.rethrowIfException(); assertEquals(numTrx, sm.numApplied.get()); } @@ -175,7 +174,7 @@ public class TestStateMachine extends BaseTest { // check leader RaftServerImpl raftServer = cluster.getLeader(); // assert every transaction has obtained context in leader - final SMTransactionContext sm = SMTransactionContext.get(raftServer.getProxy()); + final SMTransactionContext sm = SMTransactionContext.get(raftServer); List<Long> ll = sm.applied.stream().collect(Collectors.toList()); Collections.sort(ll); assertEquals(ll.toString(), ll.size(), numTrx);
