This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 7e695606 RATIS-1635. Support listener in MiniRaftCluster (#692)
7e695606 is described below
commit 7e695606aefa3c5d3c7741d1128db24ecfba9502
Author: Yaolong Liu <[email protected]>
AuthorDate: Sun Jul 24 06:31:45 2022 +0800
RATIS-1635. Support listener in MiniRaftCluster (#692)
---
.../ratis/examples/ParameterizedBaseTest.java | 2 +-
.../apache/ratis/grpc/MiniRaftClusterWithGrpc.java | 8 +-
.../ratis/netty/MiniRaftClusterWithNetty.java | 8 +-
.../apache/ratis/server/impl/MiniRaftCluster.java | 97 ++++++++++++++++------
.../MiniRaftClusterWithSimulatedRpc.java | 10 +--
...usterWithRpcTypeGrpcAndDataStreamTypeNetty.java | 9 +-
...sterWithRpcTypeNettyAndDataStreamTypeNetty.java | 9 +-
7 files changed, 94 insertions(+), 49 deletions(-)
diff --git
a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
index 72f4ee4b..9352a24e 100644
---
a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
+++
b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
@@ -77,7 +77,7 @@ public abstract class ParameterizedBaseTest extends BaseTest {
private static void add(
Collection<Object[]> clusters, MiniRaftCluster.Factory factory,
String[] ids, RaftProperties properties) {
- clusters.add(new Object[]{factory.newCluster(ids, properties)});
+ clusters.add(new Object[]{factory.newCluster(ids, new String[] {},
properties)});
}
public static Collection<Object[]> getMiniRaftClusters(
diff --git
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
index 097a7627..18c65c5b 100644
---
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -39,9 +39,9 @@ public class MiniRaftClusterWithGrpc extends
MiniRaftCluster.RpcBase {
public static final Factory<MiniRaftClusterWithGrpc> FACTORY
= new Factory<MiniRaftClusterWithGrpc>() {
@Override
- public MiniRaftClusterWithGrpc newCluster(String[] ids, RaftProperties
prop) {
+ public MiniRaftClusterWithGrpc newCluster(String[] ids, String[]
listenerIds, RaftProperties prop) {
RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
- return new MiniRaftClusterWithGrpc(ids, prop, null);
+ return new MiniRaftClusterWithGrpc(ids, listenerIds, prop, null);
}
};
@@ -55,8 +55,8 @@ public class MiniRaftClusterWithGrpc extends
MiniRaftCluster.RpcBase {
public static final DelayLocalExecutionInjection sendServerRequestInjection =
new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);
- protected MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties,
Parameters parameters) {
- super(ids, properties, parameters);
+ protected MiniRaftClusterWithGrpc(String[] ids, String[] listenerIds,
RaftProperties properties, Parameters parameters) {
+ super(ids, listenerIds, properties, parameters);
}
@Override
diff --git
a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
index 16ccffd6..990b63d9 100644
---
a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++
b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -35,9 +35,9 @@ public class MiniRaftClusterWithNetty extends
MiniRaftCluster.RpcBase {
public static final Factory<MiniRaftClusterWithNetty> FACTORY
= new Factory<MiniRaftClusterWithNetty>() {
@Override
- public MiniRaftClusterWithNetty newCluster(String[] ids, RaftProperties
prop) {
+ public MiniRaftClusterWithNetty newCluster(String[] ids, String[]
listenerIds, RaftProperties prop) {
RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.NETTY);
- return new MiniRaftClusterWithNetty(ids, prop);
+ return new MiniRaftClusterWithNetty(ids, listenerIds, prop);
}
};
@@ -51,8 +51,8 @@ public class MiniRaftClusterWithNetty extends
MiniRaftCluster.RpcBase {
public static final DelayLocalExecutionInjection sendServerRequest
= new DelayLocalExecutionInjection(NettyRpcService.SEND_SERVER_REQUEST);
- protected MiniRaftClusterWithNetty(String[] ids, RaftProperties properties) {
- super(ids, properties, null);
+ protected MiniRaftClusterWithNetty(String[] ids, String[] listenerIds,
RaftProperties properties) {
+ super(ids, listenerIds, properties, null);
}
@Override
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 39683f6f..f5cd38b3 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -22,6 +22,7 @@ import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
@@ -106,18 +107,30 @@ public abstract class MiniRaftCluster implements
Closeable {
}
default CLUSTER newCluster(int numPeers) {
- return getFactory().newCluster(numPeers, getProperties());
+ return newCluster(numPeers, 0);
+ }
+
+ default CLUSTER newCluster(int numPeers, int numListeners) {
+ return getFactory().newCluster(numPeers, numListeners,
getProperties());
}
default void runWithNewCluster(int numServers, CheckedConsumer<CLUSTER,
Exception> testCase) throws Exception {
- runWithNewCluster(numServers, true, testCase);
+ runWithNewCluster(numServers, 0, true, testCase);
}
- default void runWithNewCluster(int numServers, boolean startCluster,
CheckedConsumer<CLUSTER, Exception> testCase)
+ default void runWithNewCluster(int numServers, boolean startCluster,
CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
+ runWithNewCluster(numServers, 0, startCluster, testCase);
+ }
+
+ default void runWithNewCluster(int numServers, int numListeners,
CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
+ runWithNewCluster(numServers, numListeners, true, testCase);
+ }
+
+ default void runWithNewCluster(int numServers, int numListeners, boolean
startCluster, CheckedConsumer<CLUSTER, Exception> testCase)
throws Exception {
final StackTraceElement caller =
JavaUtils.getCallerStackTraceElement();
LOG.info("Running " + caller.getMethodName());
- final CLUSTER cluster = newCluster(numServers);
+ final CLUSTER cluster = newCluster(numServers, numListeners);
try {
if (startCluster) {
cluster.start();
@@ -133,11 +146,15 @@ public abstract class MiniRaftCluster implements
Closeable {
}
default void runWithSameCluster(int numServers, CheckedConsumer<CLUSTER,
Exception> testCase) throws Exception {
+ runWithSameCluster(numServers, 0, testCase);
+ }
+
+ default void runWithSameCluster(int numServers, int numListeners,
CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
final StackTraceElement caller =
JavaUtils.getCallerStackTraceElement();
LOG.info("Running " + caller.getMethodName());
CLUSTER cluster = null;
try {
- cluster = getFactory().reuseCluster(numServers, getProperties());
+ cluster = getFactory().reuseCluster(numServers, numListeners,
getProperties());
testCase.accept(cluster);
} catch(Exception t) {
if (cluster != null) {
@@ -151,14 +168,14 @@ public abstract class MiniRaftCluster implements
Closeable {
private final AtomicReference<CLUSTER> reusableCluster = new
AtomicReference<>();
- private CLUSTER reuseCluster(int numServers, RaftProperties prop) throws
IOException {
+ private CLUSTER reuseCluster(int numServers, int numListeners,
RaftProperties prop) throws IOException {
for(;;) {
final CLUSTER cluster = reusableCluster.get();
if (cluster != null) {
return cluster;
}
- final CLUSTER newCluster = newCluster(numServers, prop);
+ final CLUSTER newCluster = newCluster(numServers, numListeners, prop);
if (reusableCluster.compareAndSet(null, newCluster)) {
newCluster.start();
Runtime.getRuntime().addShutdownHook(new
Thread(newCluster::shutdown));
@@ -168,16 +185,20 @@ public abstract class MiniRaftCluster implements
Closeable {
}
public abstract CLUSTER newCluster(
- String[] ids, RaftProperties prop);
+ String[] ids, String[] listenerIds, RaftProperties prop);
public CLUSTER newCluster(int numServer, RaftProperties prop) {
- return newCluster(generateIds(numServer, 0), prop);
+ return newCluster(numServer, 0, prop);
+ }
+
+ public CLUSTER newCluster(int numServer, int numListeners, RaftProperties
prop) {
+ return newCluster(generateIds(numServer, 0), generateIds(numListeners,
numServer), prop);
}
}
public static abstract class RpcBase extends MiniRaftCluster {
- public RpcBase(String[] ids, RaftProperties properties, Parameters
parameters) {
- super(ids, properties, parameters);
+ public RpcBase(String[] ids, String[] listenerIds, RaftProperties
properties, Parameters parameters) {
+ super(ids, listenerIds, properties, parameters);
}
@Override
@@ -223,17 +244,27 @@ public abstract class MiniRaftCluster implements
Closeable {
}
}
- public static RaftGroup initRaftGroup(Collection<String> ids) {
- Iterator<InetSocketAddress> addresses =
NetUtils.createLocalServerAddress(4 * ids.size()).iterator();
- final RaftPeer[] peers = ids.stream()
- .map(RaftPeerId::valueOf)
- .map(id -> RaftPeer.newBuilder().setId(id)
- .setAddress(addresses.next())
- .setAdminAddress(addresses.next())
- .setClientAddress(addresses.next())
- .setDataStreamAddress(addresses.next())
- .build())
- .toArray(RaftPeer[]::new);
+ public static RaftGroup initRaftGroup(Collection<String> ids,
Collection<String> listenerIds) {
+ Iterator<InetSocketAddress> addresses =
NetUtils.createLocalServerAddress(4 * (ids.size() +
listenerIds.size())).iterator();
+ Stream<RaftPeer> peer = ids.stream()
+ .map(RaftPeerId::valueOf)
+ .map(id -> RaftPeer.newBuilder().setId(id)
+ .setAddress(addresses.next())
+ .setAdminAddress(addresses.next())
+ .setClientAddress(addresses.next())
+ .setDataStreamAddress(addresses.next())
+ .build());
+ Stream<RaftPeer> listener = listenerIds.stream()
+ .map(RaftPeerId::valueOf)
+ .map(id -> RaftPeer.newBuilder().setId(id)
+ .setAddress(addresses.next())
+ .setAdminAddress(addresses.next())
+ .setClientAddress(addresses.next())
+ .setDataStreamAddress(addresses.next())
+ .setStartupRole(RaftProtos.RaftPeerRole.LISTENER)
+ .build());
+ final RaftPeer[] peers = Stream.concat(peer,
listener).toArray(RaftPeer[]::new);
+
return RaftGroup.valueOf(RaftGroupId.randomId(), peers);
}
@@ -267,8 +298,8 @@ public abstract class MiniRaftCluster implements Closeable {
private final AtomicReference<Timer> timer = new AtomicReference<>();
- protected MiniRaftCluster(String[] ids, RaftProperties properties,
Parameters parameters) {
- this.group = initRaftGroup(Arrays.asList(ids));
+ protected MiniRaftCluster(String[] ids, String[] listenerIds, RaftProperties
properties, Parameters parameters) {
+ this.group = initRaftGroup(Arrays.asList(ids), Arrays.asList(listenerIds));
LOG.info("new {} with {}", JavaUtils.getClassSimpleName(getClass()),
group);
this.properties = new RaftProperties(properties);
this.parameters = parameters;
@@ -410,11 +441,22 @@ public abstract class MiniRaftCluster implements
Closeable {
public PeerChanges addNewPeers(int number, boolean startNewPeer,
boolean emptyPeer) throws IOException {
- return addNewPeers(generateIds(number, servers.size()), startNewPeer,
emptyPeer);
+ return addNewPeers(generateIds(number, servers.size()), startNewPeer,
emptyPeer,
+ RaftProtos.RaftPeerRole.FOLLOWER);
}
public PeerChanges addNewPeers(String[] ids, boolean startNewPeer,
boolean emptyPeer) throws IOException {
+ return addNewPeers(ids, startNewPeer, emptyPeer,
RaftProtos.RaftPeerRole.FOLLOWER);
+ }
+
+ public PeerChanges addNewPeers(int number, boolean startNewPeer,
+ boolean emptyPeer, RaftProtos.RaftPeerRole startRole) throws IOException
{
+ return addNewPeers(generateIds(number, servers.size()), startNewPeer,
emptyPeer, startRole);
+ }
+
+ public PeerChanges addNewPeers(String[] ids, boolean startNewPeer,
+ boolean emptyPeer, RaftProtos.RaftPeerRole startRole) throws IOException
{
LOG.info("Add new peers {}", Arrays.asList(ids));
final Iterable<RaftPeerId> peerIds =
CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf);
@@ -423,8 +465,9 @@ public abstract class MiniRaftCluster implements Closeable {
raftGroup = RaftGroup.valueOf(group.getGroupId(),
Collections.emptyList());
} else {
final Collection<RaftPeer> newPeers =
StreamSupport.stream(peerIds.spliterator(), false)
- .map(id -> RaftPeer.newBuilder().setId(id).build())
- .collect(Collectors.toSet());
+ .map(id -> RaftPeer.newBuilder().setId(id)
+ .setStartupRole(startRole)
+ .build()).collect(Collectors.toSet());
newPeers.addAll(group.getPeers());
raftGroup = RaftGroup.valueOf(group.getGroupId(), newPeers);
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
index 907ea515..437da929 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
@@ -40,8 +40,8 @@ public class MiniRaftClusterWithSimulatedRpc extends
MiniRaftCluster {
public static final Factory<MiniRaftClusterWithSimulatedRpc> FACTORY
= new Factory<MiniRaftClusterWithSimulatedRpc>() {
@Override
- public MiniRaftClusterWithSimulatedRpc newCluster(
- String[] ids, RaftProperties prop) {
+ public MiniRaftClusterWithSimulatedRpc newCluster(String[] ids, String[]
listenerIds,
+ RaftProperties prop) {
RaftConfigKeys.Rpc.setType(prop, SimulatedRpc.INSTANCE);
if (ThreadLocalRandom.current().nextBoolean()) {
// turn off simulate latency half of the times.
@@ -54,7 +54,7 @@ public class MiniRaftClusterWithSimulatedRpc extends
MiniRaftCluster {
= new SimulatedRequestReply<>(simulateLatencyMs);
final SimulatedClientRpc client2serverRequestReply
= new SimulatedClientRpc(simulateLatencyMs);
- return new MiniRaftClusterWithSimulatedRpc(ids, prop,
+ return new MiniRaftClusterWithSimulatedRpc(ids, listenerIds, prop,
serverRequestReply, client2serverRequestReply);
}
};
@@ -70,10 +70,10 @@ public class MiniRaftClusterWithSimulatedRpc extends
MiniRaftCluster {
private final SimulatedClientRpc client2serverRequestReply;
private MiniRaftClusterWithSimulatedRpc(
- String[] ids, RaftProperties properties,
+ String[] ids, String[] listenerIds, RaftProperties properties,
SimulatedRequestReply<RaftServerRequest, RaftServerReply>
serverRequestReply,
SimulatedClientRpc client2serverRequestReply) {
- super(ids, properties,
+ super(ids, listenerIds, properties,
SimulatedRpc.Factory.newRaftParameters(serverRequestReply,
client2serverRequestReply));
this.serverRequestReply = serverRequestReply;
this.client2serverRequestReply = client2serverRequestReply;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
index 1a210d8a..3396ada9 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
@@ -40,10 +40,11 @@ public class
MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty extends MiniRa
}
@Override
- public MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty
newCluster(String[] ids, RaftProperties prop) {
+ public MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty
newCluster(String[] ids,
+ String[] listenerIds, RaftProperties prop) {
RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
RaftConfigKeys.DataStream.setType(prop, SupportedDataStreamType.NETTY);
- return new MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(ids,
prop, parameters);
+ return new MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(ids,
listenerIds, prop, parameters);
}
}
@@ -56,9 +57,9 @@ public class
MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty extends MiniRa
}
}
- private MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(String[] ids,
RaftProperties properties,
+ private MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(String[] ids,
String[] listenerIds, RaftProperties properties,
Parameters parameters) {
- super(ids, properties, parameters);
+ super(ids, listenerIds, properties, parameters);
}
@Override
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
index dc3465cd..1e5149b4 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
@@ -34,10 +34,11 @@ public class
MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty extends MiniR
public static final
Factory<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty> FACTORY
= new Factory<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty>() {
@Override
- public MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty
newCluster(String[] ids, RaftProperties prop) {
+ public MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty
newCluster(String[] ids,
+ String[] listenerIds, RaftProperties prop) {
RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.NETTY);
RaftConfigKeys.DataStream.setType(prop, SupportedDataStreamType.NETTY);
- return new MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(ids,
prop);
+ return new MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(ids,
listenerIds, prop);
}
};
@@ -48,8 +49,8 @@ public class
MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty extends MiniR
}
}
- private MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(String[] ids,
RaftProperties properties) {
- super(ids, properties);
+ private MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(String[] ids,
String[] listenerIds, RaftProperties properties) {
+ super(ids, listenerIds, properties);
}
@Override