Repository: incubator-ratis Updated Branches: refs/heads/master 5a94eac05 -> 46116d412
RATIS-30. Provide a way to pass parameters to rpc implementations. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/46116d41 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/46116d41 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/46116d41 Branch: refs/heads/master Commit: 46116d412add72d761ebd6a9325362dc19227459 Parents: 5a94eac Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Fri Mar 3 17:30:18 2017 -0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Fri Mar 3 17:30:18 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/RaftConfigKeys.java | 3 +- .../java/org/apache/ratis/conf/Parameters.java | 66 +++++++++ .../main/java/org/apache/ratis/rpc/RpcType.java | 5 +- .../org/apache/ratis/rpc/SupportedRpcType.java | 9 +- .../java/org/apache/ratis/util/RaftUtils.java | 44 +++--- .../ratis/examples/RaftExamplesTestUtil.java | 2 +- .../java/org/apache/ratis/grpc/GrpcFactory.java | 3 + .../ratis/grpc/MiniRaftClusterWithGRpc.java | 51 +++---- .../ratis/hadooprpc/HadoopConfigKeys.java | 68 ++++++++++ .../apache/ratis/hadooprpc/HadoopFactory.java | 25 +++- .../server/HadoopRpcServerConfigKeys.java | 56 -------- .../hadooprpc/server/HadoopRpcService.java | 5 +- .../hadooprpc/MiniRaftClusterWithHadoopRpc.java | 47 +++---- .../org/apache/ratis/netty/NettyFactory.java | 3 + .../ratis/netty/MiniRaftClusterWithNetty.java | 24 ++-- .../org/apache/ratis/server/RaftServer.java | 12 +- .../ratis/server/impl/RaftServerImpl.java | 12 +- .../ratis/server/impl/ServerImplUtils.java | 13 +- .../java/org/apache/ratis/MiniRaftCluster.java | 133 +++++++++---------- .../impl/RaftReconfigurationBaseTest.java | 2 +- .../MiniRaftClusterWithSimulatedRpc.java | 95 +++++-------- .../simulation/SimulatedRequestReply.java | 11 +- .../ratis/server/simulation/SimulatedRpc.java | 32 +++-- 23 files changed, 412 insertions(+), 309 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java index adf393b..a8bc57d 100644 --- a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java +++ b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java @@ -43,7 +43,8 @@ public interface RaftConfigKeys { } // Try using it as a class name - return RaftUtils.newInstance(t, properties, RpcType.class); + return RaftUtils.newInstance( + RaftUtils.getClass(t, properties, RpcType.class)); } static void setType(BiConsumer<String, String> setRpcType, RpcType type) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/conf/Parameters.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/Parameters.java b/ratis-common/src/main/java/org/apache/ratis/conf/Parameters.java new file mode 100644 index 0000000..f562d24 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/conf/Parameters.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.conf; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A generic parameter map. + * The difference between this class and {@link RaftProperties} is that + * {@link RaftProperties} is {@link String} based, i.e. properties are strings, + * while this class is {@link Object} based, i.e. parameters can be any objects. + * + * Null keys or null values are not supported. + * + * This class is thread safe. + */ +public class Parameters { + private final Map<String, Object> map = new ConcurrentHashMap<>(); + + /** Put the key-value pair to the map. */ + public <T> T put(String key, T value, Class<T> valueClass) { + return valueClass.cast(map.put( + Objects.requireNonNull(key, "key is null"), + Objects.requireNonNull(value, () -> "value is null, key=" + key))); + } + + /** + * @param <T> The value type. + * @return The value mapped to the given key; + * or return null if the key does not map to any value. + * @throws IllegalArgumentException if the mapped value is not an instance of the given class. + */ + public <T> T get(String key, Class<T> valueClass) { + final Object value = map.get(Objects.requireNonNull(key, "key is null")); + return valueClass.cast(value); + } + + /** + * The same as {@link #get(String, Class)} except that this method throws + * a {@link NullPointerException} if the key does not map to any value. + */ + public <T> T getNonNull(String key, Class<T> valueClass) { + final T value = get(key, valueClass); + if (value != null) { + return value; + } + throw new NullPointerException("The key " + key + " does not map to any value."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java index a8085fe..701e979 100644 --- a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java +++ b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.rpc; +import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; /** The type of RPC implementations. */ @@ -24,8 +25,8 @@ public interface RpcType { /** @return the name of the rpc type. */ String name(); - /** @return a new factory created using the given properties. */ - RpcFactory newFactory(RaftProperties properties); + /** @return a new factory created using the given properties and parameters. */ + RpcFactory newFactory(RaftProperties properties, Parameters parameters); /** An interface to get {@link RpcType}. */ interface Get { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java b/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java index dcba59b..f1d8fac 100644 --- a/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java +++ b/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.rpc; +import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.util.RaftUtils; @@ -31,6 +32,8 @@ public enum SupportedRpcType implements RpcType { return valueOf(s.toUpperCase()); } + private static final Class<?>[] ARG_CLASSES = {Parameters.class}; + private final String factoryClassName; SupportedRpcType(String factoryClassName) { @@ -38,7 +41,9 @@ public enum SupportedRpcType implements RpcType { } @Override - public RpcFactory newFactory(RaftProperties properties) { - return RaftUtils.newInstance(factoryClassName, properties, RpcFactory.class); + public RpcFactory newFactory(RaftProperties properties, Parameters parameters) { + final Class<? extends RpcFactory> clazz = RaftUtils.getClass( + factoryClassName, properties, RpcFactory.class); + return RaftUtils.newInstance(clazz, ARG_CLASSES, parameters); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java index 7f5703d..02b227c 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java @@ -28,9 +28,7 @@ import java.io.*; import java.lang.reflect.Constructor; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.Iterator; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; @@ -85,12 +83,13 @@ public abstract class RaftUtils { public static final boolean PPC_64 = System.getProperties().getProperty("os.arch").contains("ppc64"); + public static final Class<?>[] EMPTY_CLASSES = {}; /** * Cache of constructors for each class. Pins the classes so they * can't be garbage collected until ReflectionUtils can be collected. */ - private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = - new ConcurrentHashMap<>(); + private static final Map<List<Class<?>>, Constructor<?>> CONSTRUCTOR_CACHE + = new ConcurrentHashMap<>(); public static InterruptedIOException toInterruptedIOException( String message, InterruptedException e) { @@ -120,31 +119,40 @@ public abstract class RaftUtils { /** * Create an object for the given class using its default constructor. + */ + public static <T> T newInstance(Class<T> clazz) { + return newInstance(clazz, EMPTY_CLASSES); + } + + /** + * Create an object for the given class using the specified constructor. * * @param clazz class of which an object is created + * @param argClasses argument classes of the constructor + * @param args actual arguments to be passed to the constructor + * @param <T> class type of clazz * @return a new object */ - public static <T> T newInstance(Class<T> clazz) { + public static <T> T newInstance(Class<T> clazz, Class<?>[] argClasses, Object... args) { Objects.requireNonNull(clazz, "clazz == null"); try { + final List<Class<?>> key = new ArrayList<>(); + key.add(clazz); + key.addAll(Arrays.asList(argClasses)); + @SuppressWarnings("unchecked") - Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz); - if (meth == null) { - meth = clazz.getDeclaredConstructor(); - meth.setAccessible(true); - CONSTRUCTOR_CACHE.put(clazz, meth); + Constructor<T> ctor = (Constructor<T>) CONSTRUCTOR_CACHE.get(key); + if (ctor == null) { + ctor = clazz.getDeclaredConstructor(argClasses); + ctor.setAccessible(true); + CONSTRUCTOR_CACHE.put(key, ctor); } - return meth.newInstance(); + return ctor.newInstance(args); } catch (Exception e) { throw new RuntimeException(e); } } - public static <BASE, SUB extends BASE> SUB newInstance( - String subClassName, RaftProperties properties, Class<BASE> base) { - return newInstance(getClass(subClassName, properties, base)); - } - public static <BASE, SUB extends BASE> Class<SUB> getClass( String subClassName, RaftProperties properties, Class<BASE> base) { try { @@ -309,8 +317,8 @@ public abstract class RaftUtils { public static <INPUT, OUTPUT> Iterable<OUTPUT> as( Iterable<INPUT> iteration, Function<INPUT, OUTPUT> converter) { - final Iterator<INPUT> i = iteration.iterator(); return () -> new Iterator<OUTPUT>() { + final Iterator<INPUT> i = iteration.iterator(); @Override public boolean hasNext() { return i.hasNext(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java index f54b766..7804353 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java @@ -37,7 +37,7 @@ public class RaftExamplesTestUtil { Collection<Object[]> clusters, MiniRaftCluster.Factory factory, String[] ids, RaftProperties properties) throws IOException { - clusters.add(new Object[]{factory.newCluster(ids, properties, true)}); + clusters.add(new Object[]{factory.newCluster(ids, properties)}); } public static Collection<Object[]> getMiniRaftClusters( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java index 3ae2602..60d6ef6 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java @@ -18,12 +18,15 @@ package org.apache.ratis.grpc; import org.apache.ratis.client.ClientFactory; +import org.apache.ratis.conf.Parameters; import org.apache.ratis.grpc.client.GrpcClientRpc; import org.apache.ratis.grpc.server.GRpcLogAppender; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.impl.*; public class GrpcFactory implements ServerFactory, ClientFactory { + public GrpcFactory(Parameters parameters) {} + @Override public SupportedRpcType getRpcType() { return SupportedRpcType.GRPC; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java index 4c7d74d..9676a48 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java @@ -21,61 +21,48 @@ import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.server.impl.BlockRequestHandlingInjection; -import org.apache.ratis.server.impl.DelayLocalExecutionInjection; -import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.*; +import org.apache.ratis.statemachine.StateMachine; -import java.util.Collection; +import java.io.IOException; public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { public static final Factory<MiniRaftClusterWithGRpc> FACTORY = new Factory<MiniRaftClusterWithGRpc>() { @Override public MiniRaftClusterWithGRpc newCluster( - String[] ids, RaftProperties prop, boolean formatted) { + String[] ids, RaftProperties prop) { RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.GRPC); - return new MiniRaftClusterWithGRpc(ids, prop, formatted); + return new MiniRaftClusterWithGRpc(ids, prop); } }; public static final DelayLocalExecutionInjection sendServerRequestInjection = new DelayLocalExecutionInjection(RaftGRpcService.GRPC_SEND_SERVER_REQUEST); - private MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties, - boolean formatted) { - super(ids, properties, formatted); + private MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties) { + super(ids, properties, null); } @Override - protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { - final RaftServerImpl s = super.newRaftServer(id, format); - s.getProperties().setInt( - RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, getPort(s)); - return s; + protected RaftServerImpl newRaftServer( + RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, + RaftProperties properties) throws IOException { + properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, getPort(id, conf)); + return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, null); } @Override - protected Collection<RaftPeer> addNewPeers( - Collection<RaftServerImpl> newServers, boolean startService) { - final Collection<RaftPeer> peers = toRaftPeers(newServers); - for (RaftPeer p: peers) { - final RaftServerImpl server = servers.get(p.getId()); - if (!startService) { - BlockRequestHandlingInjection.getInstance().blockReplier(server.getId().toString()); - } else { - server.start(); - } + protected void startServer(RaftServerImpl server, boolean startService) { + final String id = server.getId().toString(); + if (startService) { + server.start(); + BlockRequestHandlingInjection.getInstance().unblockReplier(id); + } else { + BlockRequestHandlingInjection.getInstance().blockReplier(id); } - return peers; - } - - @Override - public void startServer(RaftPeerId id) { - super.startServer(id); - BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java new file mode 100644 index 0000000..cc09510 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.ratis.conf.ConfUtils; +import org.apache.ratis.conf.Parameters; + +import java.net.InetSocketAddress; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; + +/** Hadoop Rpc specific configuration properties. */ +public interface HadoopConfigKeys { + String PREFIX = "raft.hadooprpc"; + + String CONF_KEY = PREFIX + ".conf"; + + static Configuration getConf( + BiFunction<String, Class<Configuration>, Configuration> getConf) { + return getConf.apply(CONF_KEY, Configuration.class); + } + + static void setConf(Parameters parameters, Configuration conf) { + parameters.put(CONF_KEY, conf, Configuration.class); + } + + /** IPC server configurations */ + interface Ipc { + String PREFIX = HadoopConfigKeys.PREFIX + ".ipc"; + + String ADDRESS_KEY = PREFIX + ".address"; + int DEFAULT_PORT = 10718; + String ADDRESS_DEFAULT = "0.0.0.0:" + DEFAULT_PORT; + + String HANDLERS_KEY = PREFIX + ".handlers"; + int HANDLERS_DEFAULT = 10; + + static int handlers(BiFunction<String, Integer, Integer> getInt) { + return ConfUtils.getInt(getInt, + HANDLERS_KEY, HANDLERS_DEFAULT, 1, null); + } + + static InetSocketAddress address(BiFunction<String, String, String> getTrimmed) { + return ConfUtils.getInetSocketAddress(getTrimmed, + ADDRESS_KEY, ADDRESS_DEFAULT); + } + + static void setAddress(BiConsumer<String, String> setString, String address) { + ConfUtils.set(setString, ADDRESS_KEY, address); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java index 7b9e20f..ad866c8 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java @@ -19,6 +19,7 @@ package org.apache.ratis.hadooprpc; import org.apache.hadoop.conf.Configuration; import org.apache.ratis.client.ClientFactory; +import org.apache.ratis.conf.Parameters; import org.apache.ratis.hadooprpc.client.HadoopClientRpc; import org.apache.ratis.hadooprpc.server.HadoopRpcService; import org.apache.ratis.rpc.SupportedRpcType; @@ -26,10 +27,24 @@ import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerFactory; public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFactory { - private Configuration conf; + public static Parameters newRaftParameters(Configuration conf) { + final Parameters p = new Parameters(); + HadoopConfigKeys.setConf(p, conf); + return p; + } + + private final Configuration conf; + + public HadoopFactory(Parameters parameters) { + this(HadoopConfigKeys.getConf(parameters::get)); + } + + public HadoopFactory(Configuration conf) { + this.conf = conf != null? conf: new Configuration(); + } - public void setConf(Configuration conf) { - this.conf = conf; + public Configuration getConf() { + return conf; } @Override @@ -41,12 +56,12 @@ public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFa public HadoopRpcService newRaftServerRpc(RaftServerImpl server) { return HadoopRpcService.newBuilder() .setServer(server) - .setConf(conf) + .setConf(getConf()) .build(); } @Override public HadoopClientRpc newRaftClientRpc() { - return new HadoopClientRpc(conf); + return new HadoopClientRpc(getConf()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java deleted file mode 100644 index bfdf05b..0000000 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.hadooprpc.server; - -import org.apache.ratis.conf.ConfUtils; - -import java.net.InetSocketAddress; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; - -public interface HadoopRpcServerConfigKeys { - String PREFIX = "raft.hadooprpc"; - - /** IPC server configurations */ - abstract class Ipc { - public static final String PREFIX = HadoopRpcServerConfigKeys.PREFIX + ".ipc"; - - public static final String ADDRESS_KEY = PREFIX + ".address"; - public static final int DEFAULT_PORT = 10718; - public static final String ADDRESS_DEFAULT = "0.0.0.0:" + DEFAULT_PORT; - - public static final String HANDLERS_KEY = PREFIX + ".handlers"; - public static final int HANDLERS_DEFAULT = 10; - - public static int handlers(BiFunction<String, Integer, Integer> getInt) { - return ConfUtils.getInt(getInt, - HANDLERS_KEY, HANDLERS_DEFAULT, 1, null); - } - - public static InetSocketAddress address(BiFunction<String, String, String> getTrimmed) { - return ConfUtils.getInetSocketAddress(getTrimmed, - ADDRESS_KEY, ADDRESS_DEFAULT); - } - - public static void setAddress( - BiConsumer<String, String> setString, - String address) { - ConfUtils.set(setString, ADDRESS_KEY, address); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java index 87fd2e2..00d69aa 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java @@ -20,6 +20,7 @@ package org.apache.ratis.hadooprpc.server; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.ProtobufRpcEngineShaded; import org.apache.hadoop.ipc.RPC; +import org.apache.ratis.hadooprpc.HadoopConfigKeys; import org.apache.ratis.hadooprpc.Proxy; import org.apache.ratis.hadooprpc.client.RaftClientProtocolPB; import org.apache.ratis.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB; @@ -118,8 +119,8 @@ public class HadoopRpcService implements RaftServerRpc { private static RPC.Server newRpcServer( RaftServerProtocol serverProtocol, final Configuration conf) throws IOException { - final int handlerCount = HadoopRpcServerConfigKeys.Ipc.handlers(conf::getInt); - final InetSocketAddress address = HadoopRpcServerConfigKeys.Ipc.address(conf::getTrimmed); + final int handlerCount = HadoopConfigKeys.Ipc.handlers(conf::getInt); + final InetSocketAddress address = HadoopConfigKeys.Ipc.address(conf::getTrimmed); final BlockingService service = RaftServerProtocolService.newReflectiveBlockingService( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java index 76acafd..b3a607b 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java @@ -22,36 +22,39 @@ import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.hadooprpc.server.HadoopRpcServerConfigKeys; import org.apache.ratis.hadooprpc.server.HadoopRpcService; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; +import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerImplUtils; +import org.apache.ratis.statemachine.StateMachine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { static final Logger LOG = LoggerFactory.getLogger(MiniRaftClusterWithHadoopRpc.class); public static class Factory extends MiniRaftCluster.Factory<MiniRaftClusterWithHadoopRpc> { @Override - public MiniRaftClusterWithHadoopRpc newCluster( - String[] ids, RaftProperties prop, boolean formatted) { + public MiniRaftClusterWithHadoopRpc newCluster(String[] ids, RaftProperties prop) { final Configuration conf = new Configuration(); - return newCluster(ids, prop, conf, formatted); + return newCluster(ids, prop, conf); } public MiniRaftClusterWithHadoopRpc newCluster( int numServers, RaftProperties properties, Configuration conf) { - return newCluster(generateIds(numServers, 0), properties, conf, true); + return newCluster(generateIds(numServers, 0), properties, conf); } public MiniRaftClusterWithHadoopRpc newCluster( - String[] ids, RaftProperties prop, Configuration conf, boolean formatted) { + String[] ids, RaftProperties prop, Configuration conf) { RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.HADOOP); - HadoopRpcServerConfigKeys.Ipc.setAddress(conf::set, "0.0.0.0:0"); - return new MiniRaftClusterWithHadoopRpc(ids, prop, conf, formatted); + HadoopConfigKeys.Ipc.setAddress(conf::set, "0.0.0.0:0"); + return new MiniRaftClusterWithHadoopRpc(ids, prop, conf); } } @@ -63,27 +66,21 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { private final Configuration hadoopConf; private MiniRaftClusterWithHadoopRpc(String[] ids, RaftProperties properties, - Configuration hadoopConf, boolean formatted) { - super(ids, properties, formatted); + Configuration hadoopConf) { + super(ids, properties, HadoopFactory.newRaftParameters(hadoopConf)); this.hadoopConf = hadoopConf; - getServers().stream().forEach(s -> setConf(s)); - ((HadoopFactory)clientFactory).setConf(hadoopConf); - } - - private void setConf(RaftServerImpl server) { - final Configuration conf = new Configuration(hadoopConf); - final String address = "0.0.0.0:" + getPort(server); - HadoopRpcServerConfigKeys.Ipc.setAddress(conf::set, address); - ((HadoopFactory)server.getFactory()).setConf(conf); } @Override - protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { - final RaftServerImpl s = super.newRaftServer(id, format); - if (hadoopConf != null) { - setConf(s); - } - return s; + protected RaftServerImpl newRaftServer( + RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, + RaftProperties properties) throws IOException { + final Configuration hconf = new Configuration(hadoopConf); + final String address = "0.0.0.0:" + getPort(id, conf); + HadoopConfigKeys.Ipc.setAddress(hconf::set, address); + + return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, + HadoopFactory.newRaftParameters(hconf)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java index 525b991..6dcfd15 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java @@ -18,6 +18,7 @@ package org.apache.ratis.netty; import org.apache.ratis.client.ClientFactory; +import org.apache.ratis.conf.Parameters; import org.apache.ratis.netty.client.NettyClientRpc; import org.apache.ratis.netty.server.NettyRpcService; import org.apache.ratis.rpc.SupportedRpcType; @@ -25,6 +26,8 @@ import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerFactory; public class NettyFactory extends ServerFactory.BaseFactory implements ClientFactory { + public NettyFactory(Parameters parameters) {} + @Override public SupportedRpcType getRpcType() { return SupportedRpcType.NETTY; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java index 29857dd..697aef6 100644 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java @@ -25,32 +25,36 @@ import org.apache.ratis.netty.server.NettyRpcService; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; +import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerImplUtils; +import org.apache.ratis.statemachine.StateMachine; + +import java.io.IOException; public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { public static final Factory<MiniRaftClusterWithNetty> FACTORY = new Factory<MiniRaftClusterWithNetty>() { @Override - public MiniRaftClusterWithNetty newCluster( - String[] ids, RaftProperties prop, boolean formatted) { + public MiniRaftClusterWithNetty newCluster(String[] ids, RaftProperties prop) { RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.NETTY); - return new MiniRaftClusterWithNetty(ids, prop, formatted); + return new MiniRaftClusterWithNetty(ids, prop); } }; public static final DelayLocalExecutionInjection sendServerRequest = new DelayLocalExecutionInjection(NettyRpcService.SEND_SERVER_REQUEST); - private MiniRaftClusterWithNetty( - String[] ids, RaftProperties properties, boolean formatted) { - super(ids, properties, formatted); + private MiniRaftClusterWithNetty(String[] ids, RaftProperties properties) { + super(ids, properties, null); } @Override - protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { - final RaftServerImpl s = super.newRaftServer(id, format); - NettyConfigKeys.Server.setPort(s.getProperties()::setInt, getPort(s)); - return s; + protected RaftServerImpl newRaftServer( + RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, + RaftProperties properties) throws IOException { + NettyConfigKeys.Server.setPort(properties::setInt, getPort(id, conf)); + return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index 7417e0d..dbd32b7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -17,13 +17,13 @@ */ package org.apache.ratis.server; +import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftClientAsynchronousProtocol; import org.apache.ratis.protocol.RaftClientProtocol; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; -import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.ServerFactory; import org.apache.ratis.server.impl.ServerImplUtils; import org.apache.ratis.server.protocol.RaftServerProtocol; @@ -65,6 +65,7 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, private StateMachine stateMachine; private Iterable<RaftPeer> peers; private RaftProperties properties; + private Parameters parameters; /** @return a {@link RaftServer} object. */ public RaftServer build() throws IOException { @@ -72,7 +73,8 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, Objects.requireNonNull(serverId, "The 'serverId' field is not initialized."), Objects.requireNonNull(stateMachine, "The 'stateMachine' is not initialized."), Objects.requireNonNull(peers, "The 'peers' field is not initialized."), - Objects.requireNonNull(properties, "The 'properties' field is not initialized.")); + Objects.requireNonNull(properties, "The 'properties' field is not initialized."), + parameters); } /** Set the server ID. */ @@ -98,5 +100,11 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, this.properties = properties; return this; } + + /** Set {@link Parameters}. */ + public Builder setParameters(Parameters parameters) { + this.parameters = parameters; + return this; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index d729914..b9f063e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -20,6 +20,7 @@ package org.apache.ratis.server.impl; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.conf.Parameters; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; @@ -47,7 +48,6 @@ import java.util.List; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.function.Supplier; import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; import static org.apache.ratis.util.LifeCycle.State.*; @@ -84,12 +84,12 @@ public class RaftServerImpl implements RaftServer { /** used when the peer is leader */ private volatile LeaderState leaderState; - private final Supplier<RaftServerRpc> serverRpc; + private final RaftServerRpc serverRpc; private final ServerFactory factory; RaftServerImpl(RaftPeerId id, StateMachine stateMachine, - RaftConfiguration raftConf, RaftProperties properties) + RaftConfiguration raftConf, RaftProperties properties, Parameters parameters) throws IOException { this.lifeCycle = new LifeCycle(id); minTimeoutMs = properties.getInt( @@ -105,8 +105,8 @@ public class RaftServerImpl implements RaftServer { this.state = new ServerState(id, raftConf, properties, this, stateMachine); final RpcType rpcType = RaftConfigKeys.Rpc.type(properties); - this.factory = ServerFactory.cast(rpcType.newFactory(properties)); - this.serverRpc = RaftUtils.memoize(() -> initRaftServerRpc()); + this.factory = ServerFactory.cast(rpcType.newFactory(properties, parameters)); + this.serverRpc = initRaftServerRpc(); } @Override @@ -147,7 +147,7 @@ public class RaftServerImpl implements RaftServer { } public RaftServerRpc getServerRpc() { - return serverRpc.get(); + return serverRpc; } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index 5248906..30d6e29 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.impl; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; @@ -30,17 +31,17 @@ import java.io.IOException; /** Server utilities for internal use. */ public class ServerImplUtils { public static RaftServer newRaftServer( - RaftPeerId id, StateMachine stateMachine, - Iterable<RaftPeer> peers, RaftProperties properties) throws IOException { + RaftPeerId id, StateMachine stateMachine, Iterable<RaftPeer> peers, + RaftProperties properties, Parameters parameters) throws IOException { return newRaftServer(id, stateMachine, RaftConfiguration.newBuilder().setConf(peers).build(), - properties); + properties, parameters); } public static RaftServerImpl newRaftServer( - RaftPeerId id, StateMachine stateMachine, - RaftConfiguration conf, RaftProperties properties) throws IOException { - return new RaftServerImpl(id, stateMachine, conf, properties); + RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, + RaftProperties properties, Parameters parameters) throws IOException { + return new RaftServerImpl(id, stateMachine, conf, properties, parameters); } public static TermIndex newTermIndex(long term, long index) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 9c566a9..6d14c24 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -20,6 +20,7 @@ package org.apache.ratis; import com.google.common.base.Preconditions; import org.apache.ratis.client.ClientFactory; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; @@ -42,7 +43,9 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT; @@ -59,22 +62,16 @@ public abstract class MiniRaftCluster { public static abstract class Factory<CLUSTER extends MiniRaftCluster> { public abstract CLUSTER newCluster( - String[] ids, RaftProperties prop, boolean formatted); + String[] ids, RaftProperties prop); public CLUSTER newCluster(int numServer, RaftProperties prop) { - return newCluster(generateIds(numServer, 0), prop, true); + return newCluster(generateIds(numServer, 0), prop); } } public static abstract class RpcBase extends MiniRaftCluster { - public RpcBase(String[] ids, RaftProperties properties, boolean formatted) { - super(ids, properties, formatted); - } - - @Override - public void restartServer(String id, boolean format) throws IOException { - super.restartServer(id, format); - getServer(id).start(); + public RpcBase(String[] ids, RaftProperties properties, Parameters parameters) { + super(ids, properties, parameters); } @Override @@ -82,12 +79,6 @@ public abstract class MiniRaftCluster { RaftTestUtil.setBlockRequestsFrom(src, block); } - public static int getPort(RaftServerImpl server) { - final int port = getPort(server.getId(), server.getState().getRaftConf()); - LOG.info(server.getId() + "(" + server.getRpcType() + "), port=" + port); - return port; - } - public static int getPort(RaftPeerId id, RaftConfiguration conf) { final RaftPeer peer = conf.getPeer(id); final String address = peer != null? peer.getAddress(): null; @@ -144,27 +135,46 @@ public abstract class MiniRaftCluster { protected final ClientFactory clientFactory; protected RaftConfiguration conf; protected final RaftProperties properties; + protected final Parameters parameters; private final String testBaseDir; - protected final Map<RaftPeerId, RaftServerImpl> servers = - Collections.synchronizedMap(new LinkedHashMap<>()); + protected final Map<RaftPeerId, RaftServerImpl> servers = new ConcurrentHashMap<>(); - public MiniRaftCluster(String[] ids, RaftProperties properties, - boolean formatted) { + protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) { this.conf = initConfiguration(ids); this.properties = new RaftProperties(properties); + this.parameters = parameters; final RpcType rpcType = RaftConfigKeys.Rpc.type(properties); - this.clientFactory = ClientFactory.cast(rpcType.newFactory(properties)); + this.clientFactory = ClientFactory.cast( + rpcType.newFactory(properties, parameters)); this.testBaseDir = getBaseDirectory(); - conf.getPeers().forEach( - p -> servers.put(p.getId(), newRaftServer(p.getId(), formatted))); - ExitUtils.disableSystemExit(); } + public MiniRaftCluster initServers() { + if (servers.isEmpty()) { + putNewServers(RaftUtils.as(conf.getPeers(), RaftPeer::getId), true); + } + return this; + } + + private RaftServerImpl putNewServer(RaftPeerId id, boolean format) { + final RaftServerImpl s = newRaftServer(id, format); + Preconditions.checkState(servers.put(id, s) == null); + return s; + } + + private Collection<RaftServerImpl> putNewServers( + Iterable<RaftPeerId> peers, boolean format) { + return StreamSupport.stream(peers.spliterator(), false) + .map(id -> putNewServer(id, format)) + .collect(Collectors.toList()); + } + public void start() { LOG.info("Starting " + getClass().getSimpleName()); + initServers(); servers.values().forEach(RaftServerImpl::start); } @@ -175,24 +185,19 @@ public abstract class MiniRaftCluster { final RaftPeerId newId = new RaftPeerId(id); killServer(newId); servers.remove(newId); - servers.put(newId, newRaftServer(newId, format)); + + startServer(putNewServer(newId, format), true); } - public final void restart(boolean format) throws IOException { + public void restart(boolean format) throws IOException { servers.values().stream().filter(RaftServerImpl::isAlive) .forEach(RaftServerImpl::close); List<RaftPeerId> idList = new ArrayList<>(servers.keySet()); - for (RaftPeerId id : idList) { - servers.remove(id); - servers.put(id, newRaftServer(id, format)); - } - - initRpc(); + servers.clear(); + putNewServers(idList, format); start(); } - protected void initRpc() {} - public int getMaxTimeout() { return properties.getInt( RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY, @@ -203,7 +208,7 @@ public abstract class MiniRaftCluster { return conf; } - protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { + private RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { try { final String dirStr = testBaseDir + id; if (format) { @@ -212,12 +217,16 @@ public abstract class MiniRaftCluster { final RaftProperties prop = new RaftProperties(properties); prop.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr); final StateMachine stateMachine = getStateMachine4Test(properties); - return ServerImplUtils.newRaftServer(id, stateMachine, conf, prop); + return newRaftServer(id, stateMachine, conf, prop); } catch (IOException e) { throw new RuntimeException(e); } } + protected abstract RaftServerImpl newRaftServer( + RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, + RaftProperties properties) throws IOException; + static StateMachine getStateMachine4Test(RaftProperties properties) { final Class<? extends StateMachine> smClass = properties.getClass( STATEMACHINE_CLASS_KEY, @@ -229,17 +238,12 @@ public abstract class MiniRaftCluster { public static Collection<RaftPeer> toRaftPeers( Collection<RaftServerImpl> servers) { return servers.stream() - .map(s -> new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress())) + .map(MiniRaftCluster::toRaftPeer) .collect(Collectors.toList()); } - protected Collection<RaftPeer> addNewPeers( - Collection<RaftServerImpl> newServers, boolean startService) { - final Collection<RaftPeer> peers = toRaftPeers(newServers); - if (startService) { - newServers.forEach(RaftServerImpl::start); - } - return peers; + public static RaftPeer toRaftPeer(RaftServerImpl s) { + return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()); } public PeerChanges addNewPeers(int number, boolean startNewPeer) @@ -252,18 +256,11 @@ public abstract class MiniRaftCluster { LOG.info("Add new peers {}", Arrays.asList(ids)); // create and add new RaftServers - final List<RaftServerImpl> newServers = new ArrayList<>(ids.length); - for (String id : ids) { - Preconditions.checkArgument(!servers.containsKey(id)); - - final RaftPeerId peerId = new RaftPeerId(id); - final RaftServerImpl newServer = newRaftServer(peerId, true); - servers.put(peerId, newServer); - newServers.add(newServer); - } - - final Collection<RaftPeer> newPeers = addNewPeers(newServers, startNewPeer); + final Collection<RaftServerImpl> newServers = putNewServers( + RaftUtils.as(Arrays.asList(ids), RaftPeerId::new), true); + newServers.forEach(s -> startServer(s, startNewPeer)); + final Collection<RaftPeer> newPeers = toRaftPeers(newServers); final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]); newPeers.addAll(conf.getPeers()); conf = RaftConfiguration.newBuilder().setConf(newPeers).setLogEntryIndex(0).build(); @@ -271,14 +268,14 @@ public abstract class MiniRaftCluster { return new PeerChanges(p, np, new RaftPeer[0]); } - public void startServer(RaftPeerId id) { - RaftServerImpl server = servers.get(id); - assert server != null; - server.start(); + protected void startServer(RaftServerImpl server, boolean startService) { + if (startService) { + server.start(); + } } - private RaftPeer getPeer(RaftServerImpl s) { - return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()); + public void startServer(RaftPeerId id) { + startServer(getServer(id), true); } /** @@ -289,7 +286,7 @@ public abstract class MiniRaftCluster { Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers()); List<RaftPeer> removedPeers = new ArrayList<>(number); if (removeLeader) { - final RaftPeer leader = getPeer(getLeader()); + final RaftPeer leader = toRaftPeer(getLeader()); assert !excluded.contains(leader); peers.remove(leader); removedPeers.add(leader); @@ -297,7 +294,7 @@ public abstract class MiniRaftCluster { List<RaftServerImpl> followers = getFollowers(); for (int i = 0, removed = 0; i < followers.size() && removed < (removeLeader ? number - 1 : number); i++) { - RaftPeer toRemove = getPeer(followers.get(i)); + RaftPeer toRemove = toRaftPeer(followers.get(i)); if (!excluded.contains(toRemove)) { peers.remove(toRemove); removedPeers.add(toRemove); @@ -381,13 +378,15 @@ public abstract class MiniRaftCluster { } public RaftServerImpl getServer(String id) { - return servers.get(new RaftPeerId(id)); + return getServer(new RaftPeerId(id)); + } + + public RaftServerImpl getServer(RaftPeerId id) { + return servers.get(id); } public Collection<RaftPeer> getPeers() { - return getServers().stream().map(s -> - new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress())) - .collect(Collectors.toList()); + return toRaftPeers(getServers()); } public RaftClient createClient(RaftPeerId leaderId) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 3017634..e3db854 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -583,7 +583,7 @@ public abstract class RaftReconfigurationBaseTest { @Test public void testLeaderNotReadyException() throws Exception { LOG.info("Start testLeaderNotReadyException"); - final MiniRaftCluster cluster = getCluster(1); + final MiniRaftCluster cluster = getCluster(1).initServers(); final RaftPeerId leaderId = cluster.getPeers().iterator().next().getId(); try { // delay 1s for each logSync call http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java index e33d64f..3ed2596 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java @@ -19,16 +19,17 @@ package org.apache.ratis.server.simulation; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.conf.ConfUtils; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerImplUtils; +import org.apache.ratis.statemachine.StateMachine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Collection; -import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { @@ -38,76 +39,52 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { = new Factory<MiniRaftClusterWithSimulatedRpc>() { @Override public MiniRaftClusterWithSimulatedRpc newCluster( - String[] ids, RaftProperties prop, boolean formatted) { + String[] ids, RaftProperties prop) { RaftConfigKeys.Rpc.setType(prop::set, SimulatedRpc.INSTANCE); if (ThreadLocalRandom.current().nextBoolean()) { // turn off simulate latency half of the times. prop.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0); } - return new MiniRaftClusterWithSimulatedRpc(ids, prop, formatted); - } - }; - - private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply; - private SimulatedClientRpc client2serverRequestReply; - - private MiniRaftClusterWithSimulatedRpc(String[] ids, - RaftProperties properties, boolean formatted) { - super(ids, properties, formatted); - initRpc(); - } - - @Override - protected void initRpc() { - final int simulateLatencyMs = properties.getInt( - SimulatedRequestReply.SIMULATE_LATENCY_KEY, - SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT); - LOG.info(SimulatedRequestReply.SIMULATE_LATENCY_KEY + " = " - + simulateLatencyMs); - serverRequestReply = new SimulatedRequestReply<>(simulateLatencyMs); - client2serverRequestReply = new SimulatedClientRpc(simulateLatencyMs); - getServers().stream().forEach(s -> initRpc(s)); - addPeersToRpc(toRaftPeers(getServers())); - ((SimulatedRpc.Factory)clientFactory).initRpc( - serverRequestReply, client2serverRequestReply); - } - - private void initRpc(RaftServerImpl s) { - if (serverRequestReply != null) { - ((SimulatedRpc.Factory)s.getFactory()).initRpc( + final int simulateLatencyMs = ConfUtils.getInt(prop::getInt, + SimulatedRequestReply.SIMULATE_LATENCY_KEY, + SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT, 0, null); + final SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply + = new SimulatedRequestReply<>(simulateLatencyMs); + final SimulatedClientRpc client2serverRequestReply + = new SimulatedClientRpc(simulateLatencyMs); + return new MiniRaftClusterWithSimulatedRpc(ids, prop, serverRequestReply, client2serverRequestReply); } - } - - private void addPeersToRpc(Collection<RaftPeer> peers) { - serverRequestReply.addPeers(peers); - client2serverRequestReply.addPeers(peers); - } + }; - @Override - protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { - final RaftServerImpl s = super.newRaftServer(id, format); - initRpc(s); - return s; + private final SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply; + private final SimulatedClientRpc client2serverRequestReply; + + private MiniRaftClusterWithSimulatedRpc( + String[] ids, RaftProperties properties, + SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, + SimulatedClientRpc client2serverRequestReply) { + super(ids, properties, + SimulatedRpc.Factory.newRaftParameters(serverRequestReply, client2serverRequestReply)); + this.serverRequestReply = serverRequestReply; + this.client2serverRequestReply = client2serverRequestReply; } @Override - public void restartServer(String id, boolean format) throws IOException { - super.restartServer(id, format); - RaftServerImpl s = getServer(id); - addPeersToRpc(Collections.singletonList(conf.getPeer(new RaftPeerId(id)))); - s.start(); + public void restart(boolean format) throws IOException { + serverRequestReply.clear(); + client2serverRequestReply.clear(); + super.restart(format); } @Override - public Collection<RaftPeer> addNewPeers( - Collection<RaftServerImpl> newServers, boolean startService) { - final Collection<RaftPeer> newPeers = toRaftPeers(newServers); - addPeersToRpc(newPeers); - if (startService) { - newServers.forEach(RaftServerImpl::start); - } - return newPeers; + protected RaftServerImpl newRaftServer( + RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, + RaftProperties properties) throws IOException { + serverRequestReply.addPeer(id); + client2serverRequestReply.addPeer(id); + return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, + parameters); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java index 95d3efa..64aaeac 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java @@ -20,6 +20,7 @@ package org.apache.ratis.server.simulation; import com.google.common.base.Preconditions; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RaftRpcMessage; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.RaftUtils; @@ -175,10 +176,12 @@ class SimulatedRequestReply<REQUEST extends RaftRpcMessage, queues.remove(id); } - public void addPeers(Collection<RaftPeer> newPeers) { - for (RaftPeer peer : newPeers) { - queues.put(peer.getId().toString(), new EventQueue<>()); - } + public void clear() { + queues.clear(); + } + + public void addPeer(RaftPeerId newPeer) { + queues.put(newPeer.toString(), new EventQueue<>()); } private void simulateLatency() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java index 67193b4..69707aa 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java @@ -18,8 +18,8 @@ package org.apache.ratis.server.simulation; import org.apache.ratis.client.ClientFactory; +import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.rpc.RpcFactory; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerFactory; @@ -35,19 +35,31 @@ class SimulatedRpc implements RpcType { } @Override - public RpcFactory newFactory(RaftProperties properties) { - return new Factory(); + public Factory newFactory(RaftProperties properties, Parameters parameters) { + return new Factory(parameters); } static class Factory extends ServerFactory.BaseFactory implements ClientFactory { - private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply; - private SimulatedClientRpc client2serverRequestReply; + static String SERVER_REQUEST_REPLY_KEY = "raft.simulated.serverRequestReply"; + static String CLIENT_TO_SERVER_REQUEST_REPLY_KEY = "raft.simulated.client2serverRequestReply"; - public void initRpc( - SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, - SimulatedClientRpc client2serverRequestReply) { - this.serverRequestReply = Objects.requireNonNull(serverRequestReply); - this.client2serverRequestReply = Objects.requireNonNull(client2serverRequestReply); + static Parameters newRaftParameters( + SimulatedRequestReply<RaftServerRequest, RaftServerReply> server, + SimulatedClientRpc client2server) { + final Parameters p = new Parameters(); + p.put(SERVER_REQUEST_REPLY_KEY, server, SimulatedRequestReply.class); + p.put(CLIENT_TO_SERVER_REQUEST_REPLY_KEY, client2server, SimulatedClientRpc.class); + return p; + } + + private final SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply; + private final SimulatedClientRpc client2serverRequestReply; + + Factory(Parameters parameters) { + serverRequestReply = parameters.getNonNull( + SERVER_REQUEST_REPLY_KEY, SimulatedRequestReply.class); + client2serverRequestReply = parameters.getNonNull( + CLIENT_TO_SERVER_REQUEST_REPLY_KEY, SimulatedClientRpc.class); } @Override
