Repository: incubator-ratis Updated Branches: refs/heads/master 16eb8cc6b -> e1620e804
RATIS-21. Add RpcType and ServerFactory. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e1620e80 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e1620e80 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e1620e80 Branch: refs/heads/master Commit: e1620e804c5be0fa735065631d45f2f6d8db4502 Parents: 16eb8cc Author: Jing Zhao <[email protected]> Authored: Thu Feb 16 17:05:28 2017 -0800 Committer: Jing Zhao <[email protected]> Committed: Thu Feb 16 17:05:28 2017 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/ratis/RpcType.java | 34 ++++++++++++ .../org/apache/ratis/conf/RaftProperties.java | 7 ++- .../java/org/apache/ratis/util/RaftUtils.java | 52 ++++++++++++++---- .../org/apache/ratis/grpc/RaftGRpcService.java | 6 +++ .../ratis/grpc/server/GrpcServerFactory.java | 32 ++++++++++++ .../server/PipelinedLogAppenderFactory.java | 32 ------------ .../ratis/grpc/MiniRaftClusterWithGRpc.java | 14 +---- .../grpc/TestNotLeaderExceptionWithGrpc.java | 6 --- .../grpc/TestRaftReconfigurationWithGRpc.java | 11 ---- .../org/apache/ratis/grpc/TestRaftStream.java | 37 +++++-------- .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 11 ---- .../hadooprpc/server/HadoopRpcService.java | 6 +++ .../ratis/netty/server/NettyRpcService.java | 6 +++ .../ratis/server/RaftServerConfigKeys.java | 46 ++++++++++++++-- .../org/apache/ratis/server/RaftServerRpc.java | 3 +- .../apache/ratis/server/impl/LeaderState.java | 6 +-- .../ratis/server/impl/LogAppenderFactory.java | 31 ----------- .../ratis/server/impl/RaftServerImpl.java | 22 +++----- .../apache/ratis/server/impl/ServerFactory.java | 55 ++++++++++++++++++++ .../server/simulation/SimulatedServerRpc.java | 6 +++ 20 files changed, 257 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-common/src/main/java/org/apache/ratis/RpcType.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/RpcType.java b/ratis-common/src/main/java/org/apache/ratis/RpcType.java new file mode 100644 index 0000000..7787613 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/RpcType.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +/** The type of RPC implementations. */ +public enum RpcType { + NETTY, GRPC, HADOOP, SIMULATED; + + /** Same as {@link #valueOf(String)} except that this method is case insensitive. */ + public static RpcType valueOfIgnoreCase(String s) { + return valueOf(s.toUpperCase()); + } + + /** An interface to get {@link RpcType}. */ + public interface Get { + /** @return the {@link RpcType}. */ + RpcType getRpcType(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java b/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java index 187f6ce..fc93398 100644 --- a/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java +++ b/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java @@ -1227,15 +1227,14 @@ public class RaftProperties { * @return property value as a <code>Class</code>, * or <code>defaultValue</code>. */ - public <U> Class<? extends U> getClass(String name, - Class<? extends U> defaultValue, - Class<U> xface) { + public <BASE, SUB extends BASE> Class<SUB> getClass( + String name, Class<SUB> defaultValue, Class<BASE> xface) { try { Class<?> theClass = getClass(name, defaultValue); if (theClass != null && !xface.isAssignableFrom(theClass)) throw new RuntimeException(theClass+" not "+xface.getName()); else if (theClass != null) - return theClass.asSubclass(xface); + return (Class<SUB>)theClass.asSubclass(xface); else return null; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java index 0b3d24e..17e2e41 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java @@ -29,13 +29,14 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Supplier; public abstract class RaftUtils { public static final Logger LOG = LoggerFactory.getLogger(RaftUtils.class); - private static final Class<?>[] EMPTY_CLASS_ARRAY = {}; // OSType detection public enum OSType { @@ -116,26 +117,55 @@ public abstract class RaftUtils { } /** - * Create an object for the given class and initialize it from conf + * Create an object for the given class using its default constructor. * - * @param theClass class of which an object is created + * @param clazz class of which an object is created * @return a new object */ - @SuppressWarnings("unchecked") - public static <T> T newInstance(Class<T> theClass, Object... initArgs) { - T result; + public static <T> T newInstance(Class<T> clazz) { + Objects.requireNonNull(clazz, "clazz == null"); try { - Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); + @SuppressWarnings("unchecked") + Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz); if (meth == null) { - meth = theClass.getDeclaredConstructor(EMPTY_CLASS_ARRAY); + meth = clazz.getDeclaredConstructor(); meth.setAccessible(true); - CONSTRUCTOR_CACHE.put(theClass, meth); + CONSTRUCTOR_CACHE.put(clazz, meth); } - result = meth.newInstance(initArgs); + return meth.newInstance(); } catch (Exception e) { throw new RuntimeException(e); } - return result; + } + + /** + * Create a memoized supplier which gets a value by invoking the initializer once + * and then keeps returning the same value as its supplied results. + * + * @param initializer to supply at most one non-null value. + * @param <T> The supplier result type. + * @return a memoized supplier which is thread-safe. + */ + public static <T> Supplier<T> memoize(Supplier<T> initializer) { + Objects.requireNonNull(initializer, "initializer == null"); + return new Supplier<T>() { + private volatile T value = null; + + @Override + public T get() { + T v = value; + if (v == null) { + synchronized (this) { + v = value; + if (v == null) { + v = value = Objects.requireNonNull(initializer.get(), + "initializer.get() returns null"); + } + } + } + return v; + } + }; } public static int getRandomBetween(int min, int max) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index 48acf35..473a5c6 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -19,6 +19,7 @@ package org.apache.ratis.grpc; import com.google.common.base.Preconditions; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.RpcType; import org.apache.ratis.grpc.client.RaftClientProtocolService; import org.apache.ratis.grpc.server.RaftServerProtocolClient; import org.apache.ratis.grpc.server.RaftServerProtocolService; @@ -109,6 +110,11 @@ public class RaftGRpcService implements RaftServerRpc { } @Override + public RpcType getRpcType() { + return RpcType.GRPC; + } + + @Override public void start() { // do nothing } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java new file mode 100644 index 0000000..09e3265 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.server; + +import org.apache.ratis.server.impl.FollowerInfo; +import org.apache.ratis.server.impl.LeaderState; +import org.apache.ratis.server.impl.LogAppender; +import org.apache.ratis.server.impl.ServerFactory; +import org.apache.ratis.server.impl.RaftServerImpl; + +public class GrpcServerFactory implements ServerFactory { + @Override + public LogAppender newLogAppender(RaftServerImpl server, LeaderState state, + FollowerInfo f) { + return new GRpcLogAppender(server, state, f); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java deleted file mode 100644 index d30b391..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.server; - -import org.apache.ratis.server.impl.FollowerInfo; -import org.apache.ratis.server.impl.LeaderState; -import org.apache.ratis.server.impl.LogAppender; -import org.apache.ratis.server.impl.LogAppenderFactory; -import org.apache.ratis.server.impl.RaftServerImpl; - -public class PipelinedLogAppenderFactory implements LogAppenderFactory { - @Override - public LogAppender getLogAppender(RaftServerImpl server, LeaderState state, - FollowerInfo f) { - return new GRpcLogAppender(server, state, f); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java index 85829e5..757c7ea 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java @@ -18,23 +18,18 @@ package org.apache.ratis.grpc; import com.google.common.base.Preconditions; - import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc; -import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; -import org.apache.ratis.server.impl.LogAppenderFactory; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.util.NetUtils; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -61,17 +56,10 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { public MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties, boolean formatted) throws IOException { - super(ids, getPropForGrpc(properties), formatted); + super(ids, new RaftProperties(properties), formatted); init(initRpcServices(getServers(), properties)); } - private static RaftProperties getPropForGrpc(RaftProperties prop) { - RaftProperties newProp = new RaftProperties(prop); - newProp.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - PipelinedLogAppenderFactory.class, LogAppenderFactory.class); - return newProp; - } - private static Map<RaftPeer, RaftGRpcService> initRpcServices( Collection<RaftServerImpl> servers, RaftProperties prop) throws IOException { final Map<RaftPeer, RaftGRpcService> peerRpcs = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java index 351e406..8b4c504 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java @@ -20,10 +20,6 @@ package org.apache.ratis.grpc; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftNotLeaderExceptionBaseTest; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory; -import org.apache.ratis.server.impl.LogAppenderFactory; - -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; import java.io.IOException; @@ -32,8 +28,6 @@ public class TestNotLeaderExceptionWithGrpc extends RaftNotLeaderExceptionBaseTe public MiniRaftCluster initCluster() throws IOException { String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0); RaftProperties prop = new RaftProperties(); - prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - PipelinedLogAppenderFactory.class, LogAppenderFactory.class); return new MiniRaftClusterWithGRpc(s, prop, true); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java index 450eb6e..ebc8a6d 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java @@ -18,14 +18,9 @@ package org.apache.ratis.grpc; import org.apache.log4j.Level; -import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory; import org.apache.ratis.grpc.server.RaftServerProtocolService; -import org.apache.ratis.server.impl.LogAppenderFactory; import org.apache.ratis.server.impl.RaftReconfigurationBaseTest; import org.apache.ratis.util.RaftUtils; -import org.junit.BeforeClass; - -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; import java.io.IOException; @@ -34,12 +29,6 @@ public class TestRaftReconfigurationWithGRpc extends RaftReconfigurationBaseTest RaftUtils.setLogLevel(RaftServerProtocolService.LOG, Level.DEBUG); } - @BeforeClass - public static void setProp() { - prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - PipelinedLogAppenderFactory.class, LogAppenderFactory.class); - } - @Override public MiniRaftClusterWithGRpc getCluster(int peerNum) throws IOException { return new MiniRaftClusterWithGRpc(peerNum, prop); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java index 99e98c6..ed130dd 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java @@ -19,19 +19,16 @@ package org.apache.ratis.grpc; import org.apache.log4j.Level; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.client.AppendStreamer; import org.apache.ratis.grpc.client.RaftOutputStream; -import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory; -import org.apache.ratis.server.impl.LogAppenderFactory; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.RaftUtils; import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +43,6 @@ import java.util.function.Supplier; import static org.apache.ratis.RaftTestUtil.waitForLeader; import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; import static org.junit.Assert.fail; public class TestRaftStream { @@ -57,16 +53,11 @@ public class TestRaftStream { private static final RaftProperties prop = new RaftProperties(); private static final int NUM_SERVERS = 3; + private static final byte[] BYTES = new byte[4]; private MiniRaftClusterWithGRpc cluster; - @BeforeClass - public static void setProp() { - prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - PipelinedLogAppenderFactory.class, LogAppenderFactory.class); - } - @After public void tearDown() { if (cluster != null) { @@ -74,12 +65,10 @@ public class TestRaftStream { } } - private byte[] genContent(int count) { - return toBytes(count); - } - private byte[] toBytes(int i) { - byte[] b = new byte[4]; + return toBytes(i, BYTES); + } + private byte[] toBytes(int i, byte[] b) { b[0] = (byte) ((i >>> 24) & 0xFF); b[1] = (byte) ((i >>> 16) & 0xFF); b[2] = (byte) ((i >>> 8) & 0xFF); @@ -98,21 +87,20 @@ public class TestRaftStream { cluster.start(); RaftServerImpl leader = waitForLeader(cluster); - int count = 1; + final Random r = new Random(); + final long seed = r.nextLong(); + r.setSeed(seed); try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(), - cluster.getPeers(), leader.getId())) { + cluster.getPeers(), leader.getId())) { for (int i = 0; i < 500; i++) { // generate 500 requests - out.write(genContent(count++)); + out.write(toBytes(r.nextInt())); } } // check the leader's raft log final RaftLog raftLog = leader.getState().getLog(); - final AtomicInteger currentNum = new AtomicInteger(1); - checkLog(raftLog, 500, () -> { - int value = currentNum.getAndIncrement(); - return toBytes(value); - }); + r.setSeed(seed); + checkLog(raftLog, 500, () -> toBytes(r.nextInt())); } private void checkLog(RaftLog raftLog, long expectedCommittedIndex, @@ -301,6 +289,7 @@ public class TestRaftStream { }).start(); // force change the leader + Thread.sleep(500); RaftTestUtil.waitAndKillLeader(cluster, true); final RaftServerImpl newLeader = waitForLeader(cluster); Assert.assertNotEquals(leader.getId(), newLeader.getId()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index b60e30d..1ca602f 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -19,17 +19,12 @@ package org.apache.ratis.grpc; import org.apache.log4j.Level; import org.apache.ratis.RaftBasicTests; -import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; -import org.apache.ratis.server.impl.LogAppenderFactory; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.util.RaftUtils; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; - import java.io.IOException; public class TestRaftWithGrpc extends RaftBasicTests { @@ -39,12 +34,6 @@ public class TestRaftWithGrpc extends RaftBasicTests { private final MiniRaftClusterWithGRpc cluster; - @BeforeClass - public static void setProp() { - properties.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - PipelinedLogAppenderFactory.class, LogAppenderFactory.class); - } - public TestRaftWithGrpc() throws IOException { cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, properties); Assert.assertNull(cluster.getLeader()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java index eb93c5c..7f4a251 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java @@ -20,6 +20,7 @@ package org.apache.ratis.hadooprpc.server; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.ProtobufRpcEngineShaded; import org.apache.hadoop.ipc.RPC; +import org.apache.ratis.RpcType; import org.apache.ratis.hadooprpc.Proxy; import org.apache.ratis.hadooprpc.client.RaftClientProtocolPB; import org.apache.ratis.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB; @@ -102,6 +103,11 @@ public class HadoopRpcService implements RaftServerRpc { } @Override + public RpcType getRpcType() { + return RpcType.HADOOP; + } + + @Override public InetSocketAddress getInetSocketAddress() { return ipcServerAddress; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index a659665..5a2bac5 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -27,6 +27,7 @@ import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.RpcType; import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder; import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder; import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; @@ -124,6 +125,11 @@ public final class NettyRpcService implements RaftServerRpc { .bind(port); } + @Override + public RpcType getRpcType() { + return RpcType.NETTY; + } + private Channel getChannel() { return channelFuture.awaitUninterruptibly().channel(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index d3c5173..09c77a9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -17,7 +17,7 @@ */ package org.apache.ratis.server; -import org.apache.ratis.server.impl.LogAppenderFactory; +import org.apache.ratis.RpcType; import org.apache.ratis.util.NetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,16 +56,52 @@ public interface RaftServerConfigKeys { } } + enum Factory { + NETTY("org.apache.ratis.server.impl.ServerFactory$BaseFactory"), + GRPC("org.apache.ratis.grpc.server.GrpcServerFactory"), + HADOOP("org.apache.ratis.server.impl.ServerFactory$BaseFactory"), + SIMULATED("org.apache.ratis.server.impl.ServerFactory$BaseFactory"); + + public static String getKey(String rpcType) { + return RaftServerConfigKeys.PREFIX + ".factory." + rpcType + ".class"; + } + + public static Factory valueOf(RpcType rpcType) { + return valueOf(rpcType.name()); + } + + private final RpcType rpcType = RpcType.valueOf(name()); + private final String key = getKey(name().toLowerCase()); + private final String defaultClass; + + Factory(String defaultClass) { + this.defaultClass = defaultClass; + } + + public RpcType getRpcType() { + return rpcType; + } + + public String getKey() { + return key; + } + + public String getDefaultClass() { + return defaultClass; + } + + @Override + public String toString() { + return getRpcType() + ":" + getKey() + ":" + getDefaultClass(); + } + } + String RAFT_SERVER_USE_MEMORY_LOG_KEY = "raft.server.use.memory.log"; boolean RAFT_SERVER_USE_MEMORY_LOG_DEFAULT = false; String RAFT_SERVER_STORAGE_DIR_KEY = "raft.server.storage.dir"; String RAFT_SERVER_STORAGE_DIR_DEFAULT = "file:///tmp/raft-server/"; - String RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY = "raft.server.log.appender.factory.class"; - Class<? extends LogAppenderFactory> RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT - = LogAppenderFactory.SynchronousLogAppenderFactory.class; - /** whether trigger snapshot when log size exceeds limit */ String RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY = "raft.server.auto.snapshot.enabled"; /** by default let the state machine to decide when to do checkpoint */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java index 61b3b2e..e68c536 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.server; +import org.apache.ratis.RpcType; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.server.protocol.RaftServerProtocol; @@ -28,7 +29,7 @@ import java.net.InetSocketAddress; * An server-side interface for supporting different RPC implementations * such as Netty, gRPC and Hadoop. */ -public interface RaftServerRpc extends RaftServerProtocol, Closeable { +public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, Closeable { /** To build {@link RaftServerRpc} objects. */ abstract class Builder<B extends Builder, RPC extends RaftServerRpc> { private RaftServer server; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 35d120c..d5d6adc 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -139,9 +139,10 @@ public class LeaderState { final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); final long nextIndex = raftLog.getNextIndex(); senders = new ArrayList<>(others.size()); + for (RaftPeer p : others) { FollowerInfo f = new FollowerInfo(p, t, nextIndex, true); - senders.add(server.getLogAppenderFactory().getLogAppender(server, this, f)); + senders.add(server.getFactory().newLogAppender(server, this, f)); } voterLists = divideFollowers(conf); } @@ -263,8 +264,7 @@ public class LeaderState { final long nextIndex = raftLog.getNextIndex(); for (RaftPeer peer : newMembers) { FollowerInfo f = new FollowerInfo(peer, t, nextIndex, false); - LogAppender sender = server.getLogAppenderFactory() - .getLogAppender(server, this, f); + LogAppender sender = server.getFactory().newLogAppender(server, this, f); senders.add(sender); sender.start(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java deleted file mode 100644 index e6cc213..0000000 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.server.impl; - -public interface LogAppenderFactory { - LogAppender getLogAppender(RaftServerImpl server, LeaderState state, - FollowerInfo f); - - class SynchronousLogAppenderFactory implements LogAppenderFactory { - @Override - public LogAppender getLogAppender(RaftServerImpl server, LeaderState state, - FollowerInfo f) { - return new LogAppender(server, state, f); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 30bfa04..7d9e049 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -28,6 +28,7 @@ import java.io.InterruptedIOException; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; @@ -100,7 +101,7 @@ public class RaftServerImpl implements RaftServer { private RaftServerRpc serverRpc; - private final LogAppenderFactory appenderFactory; + private final Supplier<ServerFactory> factory ; RaftServerImpl(RaftPeerId id, StateMachine stateMachine, RaftConfiguration raftConf, RaftProperties properties) @@ -117,7 +118,12 @@ public class RaftServerImpl implements RaftServer { this.properties = properties; this.stateMachine = stateMachine; this.state = new ServerState(id, raftConf, properties, this, stateMachine); - appenderFactory = initAppenderFactory(); + this.factory = RaftUtils.memoize( + () -> ServerFactory.Util.newServerFactory(getServerRpc().getRpcType(), properties)); + } + + ServerFactory getFactory() { + return factory.get(); } int getMinTimeoutMs() { @@ -137,18 +143,6 @@ public class RaftServerImpl implements RaftServer { return this.stateMachine; } - public LogAppenderFactory getLogAppenderFactory() { - return appenderFactory; - } - - private LogAppenderFactory initAppenderFactory() { - Class<? extends LogAppenderFactory> factoryClass = properties.getClass( - RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT, - LogAppenderFactory.class); - return RaftUtils.newInstance(factoryClass); - } - /** * Used by tests to set initial raft configuration with correct port bindings. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java new file mode 100644 index 0000000..38caba7 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.RpcType; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.util.RaftUtils; + +import java.util.Objects; + +/** A factory interface for creating server components. */ +public interface ServerFactory { + /** Create a new {@link LogAppender}. */ + LogAppender newLogAppender(RaftServerImpl server, LeaderState state, FollowerInfo f); + + class BaseFactory implements ServerFactory { + @Override + public LogAppender newLogAppender( + RaftServerImpl server, LeaderState state, FollowerInfo f) { + return new LogAppender(server, state, f); + } + } + + class Util { + private static <T extends ServerFactory> Class<T> getClass( + RaftServerConfigKeys.Factory f, RaftProperties properties) { + final Class<T> defaultClass = (Class<T>) properties.getClassByNameOrNull(f.getDefaultClass()); + Objects.requireNonNull(defaultClass, () -> "Failed to get the default class for " + f); + return properties.getClass(f.getKey(), defaultClass, ServerFactory.class); + } + + /** Create a new {@link ServerFactory}. */ + public static <T extends ServerFactory> T newServerFactory( + RpcType rpcType, RaftProperties properties) { + return RaftUtils.newInstance( + getClass(RaftServerConfigKeys.Factory.valueOf(rpcType), properties)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java index 09d8493..c8257ac 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.ratis.RpcType; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeer; @@ -61,6 +62,11 @@ class SimulatedServerRpc implements RaftServerRpc { } @Override + public RpcType getRpcType() { + return RpcType.SIMULATED; + } + + @Override public void start() { serverHandler.startDaemon(); clientHandler.startDaemon();
