This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit f313e077daf1d219690d946c5962bd51d33ddff6 Author: Yaolong Liu <[email protected]> AuthorDate: Sun Jul 24 06:31:45 2022 +0800 RATIS-1635. Support listener in MiniRaftCluster (#692) (cherry picked from commit 7e695606aefa3c5d3c7741d1128db24ecfba9502) --- .../ratis/examples/ParameterizedBaseTest.java | 2 +- .../apache/ratis/grpc/MiniRaftClusterWithGrpc.java | 8 +- .../ratis/netty/MiniRaftClusterWithNetty.java | 8 +- .../apache/ratis/server/impl/MiniRaftCluster.java | 97 ++++++++++++++++------ .../MiniRaftClusterWithSimulatedRpc.java | 10 +-- ...usterWithRpcTypeGrpcAndDataStreamTypeNetty.java | 9 +- ...sterWithRpcTypeNettyAndDataStreamTypeNetty.java | 9 +- 7 files changed, 94 insertions(+), 49 deletions(-) diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java index 72f4ee4b..9352a24e 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java @@ -77,7 +77,7 @@ public abstract class ParameterizedBaseTest extends BaseTest { private static void add( Collection<Object[]> clusters, MiniRaftCluster.Factory factory, String[] ids, RaftProperties properties) { - clusters.add(new Object[]{factory.newCluster(ids, properties)}); + clusters.add(new Object[]{factory.newCluster(ids, new String[] {}, properties)}); } public static Collection<Object[]> getMiniRaftClusters( diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java index 097a7627..18c65c5b 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java @@ -39,9 +39,9 @@ public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase { public static final Factory<MiniRaftClusterWithGrpc> FACTORY = new Factory<MiniRaftClusterWithGrpc>() { @Override - public MiniRaftClusterWithGrpc newCluster(String[] ids, RaftProperties prop) { + public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] listenerIds, RaftProperties prop) { RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC); - return new MiniRaftClusterWithGrpc(ids, prop, null); + return new MiniRaftClusterWithGrpc(ids, listenerIds, prop, null); } }; @@ -55,8 +55,8 @@ public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase { public static final DelayLocalExecutionInjection sendServerRequestInjection = new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST); - protected MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties, Parameters parameters) { - super(ids, properties, parameters); + protected MiniRaftClusterWithGrpc(String[] ids, String[] listenerIds, RaftProperties properties, Parameters parameters) { + super(ids, listenerIds, properties, parameters); } @Override diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java index 16ccffd6..990b63d9 100644 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java @@ -35,9 +35,9 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { public static final Factory<MiniRaftClusterWithNetty> FACTORY = new Factory<MiniRaftClusterWithNetty>() { @Override - public MiniRaftClusterWithNetty newCluster(String[] ids, RaftProperties prop) { + public MiniRaftClusterWithNetty newCluster(String[] ids, String[] listenerIds, RaftProperties prop) { RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.NETTY); - return new MiniRaftClusterWithNetty(ids, prop); + return new MiniRaftClusterWithNetty(ids, listenerIds, prop); } }; @@ -51,8 +51,8 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { public static final DelayLocalExecutionInjection sendServerRequest = new DelayLocalExecutionInjection(NettyRpcService.SEND_SERVER_REQUEST); - protected MiniRaftClusterWithNetty(String[] ids, RaftProperties properties) { - super(ids, properties, null); + protected MiniRaftClusterWithNetty(String[] ids, String[] listenerIds, RaftProperties properties) { + super(ids, listenerIds, properties, null); } @Override diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 39683f6f..f5cd38b3 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -22,6 +22,7 @@ import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; @@ -106,18 +107,30 @@ public abstract class MiniRaftCluster implements Closeable { } default CLUSTER newCluster(int numPeers) { - return getFactory().newCluster(numPeers, getProperties()); + return newCluster(numPeers, 0); + } + + default CLUSTER newCluster(int numPeers, int numListeners) { + return getFactory().newCluster(numPeers, numListeners, getProperties()); } default void runWithNewCluster(int numServers, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception { - runWithNewCluster(numServers, true, testCase); + runWithNewCluster(numServers, 0, true, testCase); } - default void runWithNewCluster(int numServers, boolean startCluster, CheckedConsumer<CLUSTER, Exception> testCase) + default void runWithNewCluster(int numServers, boolean startCluster, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception { + runWithNewCluster(numServers, 0, startCluster, testCase); + } + + default void runWithNewCluster(int numServers, int numListeners, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception { + runWithNewCluster(numServers, numListeners, true, testCase); + } + + default void runWithNewCluster(int numServers, int numListeners, boolean startCluster, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception { final StackTraceElement caller = JavaUtils.getCallerStackTraceElement(); LOG.info("Running " + caller.getMethodName()); - final CLUSTER cluster = newCluster(numServers); + final CLUSTER cluster = newCluster(numServers, numListeners); try { if (startCluster) { cluster.start(); @@ -133,11 +146,15 @@ public abstract class MiniRaftCluster implements Closeable { } default void runWithSameCluster(int numServers, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception { + runWithSameCluster(numServers, 0, testCase); + } + + default void runWithSameCluster(int numServers, int numListeners, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception { final StackTraceElement caller = JavaUtils.getCallerStackTraceElement(); LOG.info("Running " + caller.getMethodName()); CLUSTER cluster = null; try { - cluster = getFactory().reuseCluster(numServers, getProperties()); + cluster = getFactory().reuseCluster(numServers, numListeners, getProperties()); testCase.accept(cluster); } catch(Exception t) { if (cluster != null) { @@ -151,14 +168,14 @@ public abstract class MiniRaftCluster implements Closeable { private final AtomicReference<CLUSTER> reusableCluster = new AtomicReference<>(); - private CLUSTER reuseCluster(int numServers, RaftProperties prop) throws IOException { + private CLUSTER reuseCluster(int numServers, int numListeners, RaftProperties prop) throws IOException { for(;;) { final CLUSTER cluster = reusableCluster.get(); if (cluster != null) { return cluster; } - final CLUSTER newCluster = newCluster(numServers, prop); + final CLUSTER newCluster = newCluster(numServers, numListeners, prop); if (reusableCluster.compareAndSet(null, newCluster)) { newCluster.start(); Runtime.getRuntime().addShutdownHook(new Thread(newCluster::shutdown)); @@ -168,16 +185,20 @@ public abstract class MiniRaftCluster implements Closeable { } public abstract CLUSTER newCluster( - String[] ids, RaftProperties prop); + String[] ids, String[] listenerIds, RaftProperties prop); public CLUSTER newCluster(int numServer, RaftProperties prop) { - return newCluster(generateIds(numServer, 0), prop); + return newCluster(numServer, 0, prop); + } + + public CLUSTER newCluster(int numServer, int numListeners, RaftProperties prop) { + return newCluster(generateIds(numServer, 0), generateIds(numListeners, numServer), prop); } } public static abstract class RpcBase extends MiniRaftCluster { - public RpcBase(String[] ids, RaftProperties properties, Parameters parameters) { - super(ids, properties, parameters); + public RpcBase(String[] ids, String[] listenerIds, RaftProperties properties, Parameters parameters) { + super(ids, listenerIds, properties, parameters); } @Override @@ -223,17 +244,27 @@ public abstract class MiniRaftCluster implements Closeable { } } - public static RaftGroup initRaftGroup(Collection<String> ids) { - Iterator<InetSocketAddress> addresses = NetUtils.createLocalServerAddress(4 * ids.size()).iterator(); - final RaftPeer[] peers = ids.stream() - .map(RaftPeerId::valueOf) - .map(id -> RaftPeer.newBuilder().setId(id) - .setAddress(addresses.next()) - .setAdminAddress(addresses.next()) - .setClientAddress(addresses.next()) - .setDataStreamAddress(addresses.next()) - .build()) - .toArray(RaftPeer[]::new); + public static RaftGroup initRaftGroup(Collection<String> ids, Collection<String> listenerIds) { + Iterator<InetSocketAddress> addresses = NetUtils.createLocalServerAddress(4 * (ids.size() + listenerIds.size())).iterator(); + Stream<RaftPeer> peer = ids.stream() + .map(RaftPeerId::valueOf) + .map(id -> RaftPeer.newBuilder().setId(id) + .setAddress(addresses.next()) + .setAdminAddress(addresses.next()) + .setClientAddress(addresses.next()) + .setDataStreamAddress(addresses.next()) + .build()); + Stream<RaftPeer> listener = listenerIds.stream() + .map(RaftPeerId::valueOf) + .map(id -> RaftPeer.newBuilder().setId(id) + .setAddress(addresses.next()) + .setAdminAddress(addresses.next()) + .setClientAddress(addresses.next()) + .setDataStreamAddress(addresses.next()) + .setStartupRole(RaftProtos.RaftPeerRole.LISTENER) + .build()); + final RaftPeer[] peers = Stream.concat(peer, listener).toArray(RaftPeer[]::new); + return RaftGroup.valueOf(RaftGroupId.randomId(), peers); } @@ -267,8 +298,8 @@ public abstract class MiniRaftCluster implements Closeable { private final AtomicReference<Timer> timer = new AtomicReference<>(); - protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) { - this.group = initRaftGroup(Arrays.asList(ids)); + protected MiniRaftCluster(String[] ids, String[] listenerIds, RaftProperties properties, Parameters parameters) { + this.group = initRaftGroup(Arrays.asList(ids), Arrays.asList(listenerIds)); LOG.info("new {} with {}", JavaUtils.getClassSimpleName(getClass()), group); this.properties = new RaftProperties(properties); this.parameters = parameters; @@ -410,11 +441,22 @@ public abstract class MiniRaftCluster implements Closeable { public PeerChanges addNewPeers(int number, boolean startNewPeer, boolean emptyPeer) throws IOException { - return addNewPeers(generateIds(number, servers.size()), startNewPeer, emptyPeer); + return addNewPeers(generateIds(number, servers.size()), startNewPeer, emptyPeer, + RaftProtos.RaftPeerRole.FOLLOWER); } public PeerChanges addNewPeers(String[] ids, boolean startNewPeer, boolean emptyPeer) throws IOException { + return addNewPeers(ids, startNewPeer, emptyPeer, RaftProtos.RaftPeerRole.FOLLOWER); + } + + public PeerChanges addNewPeers(int number, boolean startNewPeer, + boolean emptyPeer, RaftProtos.RaftPeerRole startRole) throws IOException { + return addNewPeers(generateIds(number, servers.size()), startNewPeer, emptyPeer, startRole); + } + + public PeerChanges addNewPeers(String[] ids, boolean startNewPeer, + boolean emptyPeer, RaftProtos.RaftPeerRole startRole) throws IOException { LOG.info("Add new peers {}", Arrays.asList(ids)); final Iterable<RaftPeerId> peerIds = CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf); @@ -423,8 +465,9 @@ public abstract class MiniRaftCluster implements Closeable { raftGroup = RaftGroup.valueOf(group.getGroupId(), Collections.emptyList()); } else { final Collection<RaftPeer> newPeers = StreamSupport.stream(peerIds.spliterator(), false) - .map(id -> RaftPeer.newBuilder().setId(id).build()) - .collect(Collectors.toSet()); + .map(id -> RaftPeer.newBuilder().setId(id) + .setStartupRole(startRole) + .build()).collect(Collectors.toSet()); newPeers.addAll(group.getPeers()); raftGroup = RaftGroup.valueOf(group.getGroupId(), newPeers); } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java index 907ea515..437da929 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java @@ -40,8 +40,8 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { public static final Factory<MiniRaftClusterWithSimulatedRpc> FACTORY = new Factory<MiniRaftClusterWithSimulatedRpc>() { @Override - public MiniRaftClusterWithSimulatedRpc newCluster( - String[] ids, RaftProperties prop) { + public MiniRaftClusterWithSimulatedRpc newCluster(String[] ids, String[] listenerIds, + RaftProperties prop) { RaftConfigKeys.Rpc.setType(prop, SimulatedRpc.INSTANCE); if (ThreadLocalRandom.current().nextBoolean()) { // turn off simulate latency half of the times. @@ -54,7 +54,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { = new SimulatedRequestReply<>(simulateLatencyMs); final SimulatedClientRpc client2serverRequestReply = new SimulatedClientRpc(simulateLatencyMs); - return new MiniRaftClusterWithSimulatedRpc(ids, prop, + return new MiniRaftClusterWithSimulatedRpc(ids, listenerIds, prop, serverRequestReply, client2serverRequestReply); } }; @@ -70,10 +70,10 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { private final SimulatedClientRpc client2serverRequestReply; private MiniRaftClusterWithSimulatedRpc( - String[] ids, RaftProperties properties, + String[] ids, String[] listenerIds, RaftProperties properties, SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, SimulatedClientRpc client2serverRequestReply) { - super(ids, properties, + super(ids, listenerIds, properties, SimulatedRpc.Factory.newRaftParameters(serverRequestReply, client2serverRequestReply)); this.serverRequestReply = serverRequestReply; this.client2serverRequestReply = client2serverRequestReply; diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java index 1a210d8a..3396ada9 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java @@ -40,10 +40,11 @@ public class MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty extends MiniRa } @Override - public MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty newCluster(String[] ids, RaftProperties prop) { + public MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty newCluster(String[] ids, + String[] listenerIds, RaftProperties prop) { RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC); RaftConfigKeys.DataStream.setType(prop, SupportedDataStreamType.NETTY); - return new MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(ids, prop, parameters); + return new MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(ids, listenerIds, prop, parameters); } } @@ -56,9 +57,9 @@ public class MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty extends MiniRa } } - private MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(String[] ids, RaftProperties properties, + private MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(String[] ids, String[] listenerIds, RaftProperties properties, Parameters parameters) { - super(ids, properties, parameters); + super(ids, listenerIds, properties, parameters); } @Override diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java index dc3465cd..1e5149b4 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java @@ -34,10 +34,11 @@ public class MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty extends MiniR public static final Factory<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty> FACTORY = new Factory<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty>() { @Override - public MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty newCluster(String[] ids, RaftProperties prop) { + public MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty newCluster(String[] ids, + String[] listenerIds, RaftProperties prop) { RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.NETTY); RaftConfigKeys.DataStream.setType(prop, SupportedDataStreamType.NETTY); - return new MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(ids, prop); + return new MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(ids, listenerIds, prop); } }; @@ -48,8 +49,8 @@ public class MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty extends MiniR } } - private MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(String[] ids, RaftProperties properties) { - super(ids, properties); + private MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(String[] ids, String[] listenerIds, RaftProperties properties) { + super(ids, listenerIds, properties); } @Override
