Repository: incubator-ratis Updated Branches: refs/heads/master d16c7dd25 -> 021ee857f
RATIS-332. Allow RaftServer.Builder to set StateMachine.Registry. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/021ee857 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/021ee857 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/021ee857 Branch: refs/heads/master Commit: 021ee857f5b24d5d8a8ad40fe917803869b0bb75 Parents: d16c7dd Author: Lokesh Jain <[email protected]> Authored: Thu Sep 27 16:49:26 2018 +0530 Committer: Lokesh Jain <[email protected]> Committed: Thu Sep 27 16:49:26 2018 +0530 ---------------------------------------------------------------------- .../ratis/grpc/MiniRaftClusterWithGrpc.java | 4 +- .../ratis/grpc/TestRaftServerWithGrpc.java | 4 +- .../hadooprpc/MiniRaftClusterWithHadoopRpc.java | 4 +- .../ratis/netty/MiniRaftClusterWithNetty.java | 4 +- .../org/apache/ratis/server/RaftServer.java | 11 ++- .../ratis/server/impl/RaftServerImpl.java | 2 +- .../ratis/server/impl/ServerImplUtils.java | 4 +- .../java/org/apache/ratis/MiniRaftCluster.java | 30 +++++--- .../MiniRaftClusterWithSimulatedRpc.java | 5 +- .../ratis/statemachine/TestStateMachine.java | 80 +++++++++++++------- 10 files changed, 95 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/021ee857/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java index 176cfa0..92401ba 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java @@ -57,10 +57,10 @@ public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase { @Override protected RaftServerProxy newRaftServer( - RaftPeerId id, StateMachine stateMachine, RaftGroup group, + RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftGroup group, RaftProperties properties) throws IOException { GrpcConfigKeys.Server.setPort(properties, getPort(id, group)); - return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, null); + return ServerImplUtils.newRaftServer(id, group, stateMachineRegistry, properties, null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/021ee857/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java index 8a9e94b..2ec7ae8 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java @@ -42,7 +42,7 @@ public class TestRaftServerWithGrpc extends BaseTest { // compared to leader. This helps in locking the raft storage directory to // be used by next raft server proxy instance. final StateMachine stateMachine = cluster.getLeader().getStateMachine(); - ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), stateMachine, properties, null); + ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, properties, null); // Close the server rpc for leader so that new raft server can be bound to it. cluster.getLeader().getServerRpc().close(); @@ -51,7 +51,7 @@ public class TestRaftServerWithGrpc extends BaseTest { // the raft server proxy created earlier. Raft server proxy should close // the rpc server on failure. testFailureCase("start a new server with the same address", - () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), stateMachine, properties, null).start(), + () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, properties, null).start(), IOException.class, OverlappingFileLockException.class); // Try to start a raft server rpc at the leader address. cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/021ee857/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java index 44a1900..85c1092 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java @@ -80,13 +80,13 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { @Override protected RaftServerProxy newRaftServer( - RaftPeerId id, StateMachine stateMachine, RaftGroup group, + RaftPeerId id, StateMachine.Registry stateMachineRegistry , RaftGroup group, RaftProperties properties) throws IOException { final Configuration hconf = new Configuration(hadoopConf); final String address = "0.0.0.0:" + getPort(id, group); HadoopConfigKeys.Ipc.setAddress(hconf, address); - return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, + return ServerImplUtils.newRaftServer(id, group, stateMachineRegistry, properties, HadoopFactory.newRaftParameters(hconf)); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/021ee857/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java index 3941f33..6f30964 100644 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java @@ -56,10 +56,10 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { @Override protected RaftServerProxy newRaftServer( - RaftPeerId id, StateMachine stateMachine, RaftGroup group, + RaftPeerId id, StateMachine.Registry stateMachineRegistry , RaftGroup group, RaftProperties properties) throws IOException { NettyConfigKeys.Server.setPort(properties, getPort(id, group)); - return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, null); + return ServerImplUtils.newRaftServer(id, group, stateMachineRegistry, properties, null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/021ee857/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index ac3111e..111b31d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -63,7 +63,7 @@ public interface RaftServer extends Closeable, RpcType.Get, /** To build {@link RaftServer} objects. */ class Builder { private RaftPeerId serverId; - private StateMachine stateMachine; + private StateMachine.Registry stateMachineRegistry ; private RaftGroup group = null; private RaftProperties properties; private Parameters parameters; @@ -73,7 +73,7 @@ public interface RaftServer extends Closeable, RpcType.Get, return ServerImplUtils.newRaftServer( serverId, group, - Objects.requireNonNull(stateMachine, "The 'stateMachine' is not initialized."), + Objects.requireNonNull(stateMachineRegistry , "Neither 'stateMachine' nor 'setStateMachineRegistry' is initialized."), Objects.requireNonNull(properties, "The 'properties' field is not initialized."), parameters); } @@ -86,7 +86,12 @@ public interface RaftServer extends Closeable, RpcType.Get, /** Set the {@link StateMachine} of the server. */ public Builder setStateMachine(StateMachine stateMachine) { - this.stateMachine = stateMachine; + return setStateMachineRegistry(gid -> stateMachine); + } + + /** Set the {@link StateMachine.Registry} of the server. */ + public Builder setStateMachineRegistry(StateMachine.Registry stateMachineRegistry ) { + this.stateMachineRegistry = stateMachineRegistry ; return this; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/021ee857/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 9de81ff..7fde782 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -89,7 +89,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException { final RaftPeerId id = proxy.getId(); - LOG.debug("{}: new RaftServerImpl for {}", id, group); + LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine); this.groupId = group.getGroupId(); this.lifeCycle = new LifeCycle(id); this.stateMachine = stateMachine; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/021ee857/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index 2befb38..24b5530 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -32,10 +32,10 @@ import java.io.IOException; public class ServerImplUtils { /** For the case that all {@link RaftServerImpl} objects share the same {@link StateMachine}. */ public static RaftServerProxy newRaftServer( - RaftPeerId id, RaftGroup group, StateMachine stateMachine, + RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters) throws IOException { RaftServerProxy.LOG.debug("newRaftServer: {}, {}", id, group); - final RaftServerProxy proxy = newRaftServer(id, gid -> stateMachine, properties, parameters); + final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, properties, parameters); proxy.initGroups(group); return proxy; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/021ee857/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index d2aef5b..60f7050 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -59,7 +59,7 @@ public abstract class MiniRaftCluster implements Closeable { public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName(); public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class"; - public static final Class<? extends StateMachine> STATEMACHINE_CLASS_DEFAULT = BaseStateMachine.class; + private static final StateMachine.Registry STATEMACHINE_REGISTRY_DEFAULT = gid -> new BaseStateMachine(); public static abstract class Factory<CLUSTER extends MiniRaftCluster> { public interface Get<CLUSTER extends MiniRaftCluster> { @@ -149,6 +149,8 @@ public abstract class MiniRaftCluster implements Closeable { protected final Map<RaftPeerId, RaftServerProxy> servers = new ConcurrentHashMap<>(); protected final Map<RaftPeerId, RaftPeer> peers = new ConcurrentHashMap<>(); + private volatile StateMachine.Registry stateMachineRegistry = null; + private final Timer timer; protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) { @@ -236,33 +238,41 @@ public abstract class MiniRaftCluster implements Closeable { } final RaftProperties prop = new RaftProperties(properties); RaftServerConfigKeys.setStorageDir(prop, dir); - final StateMachine stateMachine = getStateMachine4Test(properties); - return newRaftServer(id, stateMachine, group, prop); + return newRaftServer(id, getStateMachineRegistry(properties), group, prop); } catch (IOException e) { throw new RuntimeException(e); } } protected abstract RaftServerProxy newRaftServer( - RaftPeerId id, StateMachine stateMachine, RaftGroup group, + RaftPeerId id, StateMachine.Registry stateMachineRegistry , RaftGroup group, RaftProperties properties) throws IOException; - static StateMachine getStateMachine4Test(RaftProperties properties) { + public void setStateMachineRegistry(StateMachine.Registry stateMachineRegistry) { + this.stateMachineRegistry = stateMachineRegistry; + } + + StateMachine.Registry getStateMachineRegistry(RaftProperties properties) { + if (stateMachineRegistry != null) { + return stateMachineRegistry; + } + final Class<? extends StateMachine> smClass = properties.getClass( - STATEMACHINE_CLASS_KEY, - STATEMACHINE_CLASS_DEFAULT, - StateMachine.class); + STATEMACHINE_CLASS_KEY, null, StateMachine.class); + if (smClass == null) { + return STATEMACHINE_REGISTRY_DEFAULT; + } final RuntimeException exception; try { - return ReflectionUtils.newInstance(smClass); + return gid -> ReflectionUtils.newInstance(smClass); } catch(RuntimeException e) { exception = e; } try { final Class<?>[] argClasses = {RaftProperties.class}; - return ReflectionUtils.newInstance(smClass, argClasses, properties); + return gid -> ReflectionUtils.newInstance(smClass, argClasses, properties); } catch(RuntimeException e) { exception.addSuppressed(e); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/021ee857/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java index 2287174..2de422b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java @@ -88,12 +88,11 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { @Override protected RaftServerProxy newRaftServer( - RaftPeerId id, StateMachine stateMachine, RaftGroup group, + RaftPeerId id, StateMachine.Registry stateMachineRegistry , RaftGroup group, RaftProperties properties) throws IOException { serverRequestReply.addPeer(id); client2serverRequestReply.addPeer(id); - return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, - parameters); + return ServerImplUtils.newRaftServer(id, group, stateMachineRegistry, properties, parameters); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/021ee857/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java index 189c9da..3f18c41 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java @@ -25,8 +25,14 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; import org.apache.ratis.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.statemachine.impl.TransactionContextImpl; @@ -36,7 +42,9 @@ import org.junit.*; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -48,37 +56,13 @@ import static org.junit.Assert.*; /** * Test StateMachine related functionality */ -public class TestStateMachine extends BaseTest { +public class TestStateMachine extends BaseTest implements MiniRaftClusterWithSimulatedRpc.FactoryGet { static { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } - public static final int NUM_SERVERS = 5; - - private final RaftProperties properties = new RaftProperties(); - { - // TODO: fix and run with in-memory log. It fails with NPE - // TODO: if change setUseMemory to true - RaftServerConfigKeys.Log.setUseMemory(properties, false); - } - - private MiniRaftClusterWithSimulatedRpc cluster; - - @Before - public void setup() throws IOException { - properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SMTransactionContext.class, StateMachine.class); - - cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(NUM_SERVERS, properties); - cluster.start(); - } - - @After - public void tearDown() { - if (cluster != null) { - cluster.shutdown(); - } - } + public static final int NUM_SERVERS = 3; static class SMTransactionContext extends SimpleStateMachine4Testing { public static SMTransactionContext get(RaftServerImpl s) { @@ -135,6 +119,20 @@ public class TestStateMachine extends BaseTest { @Test public void testTransactionContextIsPassedBack() throws Throwable { + final RaftProperties properties = new RaftProperties(); + properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SMTransactionContext.class, StateMachine.class); + + // TODO: fix and run with in-memory log. It fails with NPE + // TODO: if change setUseMemory to true + RaftServerConfigKeys.Log.setUseMemory(properties, false); + + try(MiniRaftClusterWithSimulatedRpc cluster = getFactory().newCluster(NUM_SERVERS, properties)) { + cluster.start(); + runTestTransactionContextIsPassedBack(cluster); + } + } + + static void runTestTransactionContextIsPassedBack(MiniRaftCluster cluster) throws Throwable { // tests that the TrxContext set by the StateMachine in Leader is passed back to the SM int numTrx = 100; final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numTrx); @@ -164,4 +162,34 @@ public class TestStateMachine extends BaseTest { assertEquals(ll.toString(), Long.valueOf(i+1), ll.get(i)); } } + + @Test + public void testStateMachineRegistry() throws Throwable { + final Map<RaftGroupId, StateMachine> registry = new ConcurrentHashMap<>(); + registry.put(RaftGroupId.randomId(), new SimpleStateMachine4Testing()); + registry.put(RaftGroupId.randomId(), new SMTransactionContext()); + + try(MiniRaftClusterWithSimulatedRpc cluster = newCluster(0)) { + cluster.setStateMachineRegistry(registry::get); + + final RaftPeerId id = RaftPeerId.valueOf("s0"); + cluster.putNewServer(id, null, true); + cluster.start(); + + for(RaftGroupId gid : registry.keySet()) { + final RaftGroup newGroup = RaftGroup.valueOf(gid, cluster.getPeers()); + LOG.info("add new group: " + newGroup); + final RaftClient client = cluster.createClient(newGroup); + for(RaftPeer p : newGroup.getPeers()) { + client.groupAdd(newGroup, p.getId()); + } + } + + final RaftServerProxy proxy = cluster.getServer(id); + for(Map.Entry<RaftGroupId, StateMachine> e: registry.entrySet()) { + final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(proxy, e.getKey()); + Assert.assertSame(e.getValue(), impl.getStateMachine()); + } + } + } }
