This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e695606 RATIS-1635. Support listener in MiniRaftCluster (#692)
7e695606 is described below

commit 7e695606aefa3c5d3c7741d1128db24ecfba9502
Author: Yaolong Liu <[email protected]>
AuthorDate: Sun Jul 24 06:31:45 2022 +0800

    RATIS-1635. Support listener in MiniRaftCluster (#692)
---
 .../ratis/examples/ParameterizedBaseTest.java      |  2 +-
 .../apache/ratis/grpc/MiniRaftClusterWithGrpc.java |  8 +-
 .../ratis/netty/MiniRaftClusterWithNetty.java      |  8 +-
 .../apache/ratis/server/impl/MiniRaftCluster.java  | 97 ++++++++++++++++------
 .../MiniRaftClusterWithSimulatedRpc.java           | 10 +--
 ...usterWithRpcTypeGrpcAndDataStreamTypeNetty.java |  9 +-
 ...sterWithRpcTypeNettyAndDataStreamTypeNetty.java |  9 +-
 7 files changed, 94 insertions(+), 49 deletions(-)

diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
index 72f4ee4b..9352a24e 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
@@ -77,7 +77,7 @@ public abstract class ParameterizedBaseTest extends BaseTest {
   private static void add(
       Collection<Object[]> clusters, MiniRaftCluster.Factory factory,
       String[] ids, RaftProperties properties) {
-    clusters.add(new Object[]{factory.newCluster(ids, properties)});
+    clusters.add(new Object[]{factory.newCluster(ids, new String[] {}, 
properties)});
   }
 
   public static Collection<Object[]> getMiniRaftClusters(
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
index 097a7627..18c65c5b 100644
--- 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++ 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -39,9 +39,9 @@ public class MiniRaftClusterWithGrpc extends 
MiniRaftCluster.RpcBase {
   public static final Factory<MiniRaftClusterWithGrpc> FACTORY
       = new Factory<MiniRaftClusterWithGrpc>() {
     @Override
-    public MiniRaftClusterWithGrpc newCluster(String[] ids, RaftProperties 
prop) {
+    public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] 
listenerIds, RaftProperties prop) {
       RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
-      return new MiniRaftClusterWithGrpc(ids, prop, null);
+      return new MiniRaftClusterWithGrpc(ids, listenerIds, prop, null);
     }
   };
 
@@ -55,8 +55,8 @@ public class MiniRaftClusterWithGrpc extends 
MiniRaftCluster.RpcBase {
   public static final DelayLocalExecutionInjection sendServerRequestInjection =
       new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);
 
-  protected MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties, 
Parameters parameters) {
-    super(ids, properties, parameters);
+  protected MiniRaftClusterWithGrpc(String[] ids, String[] listenerIds, 
RaftProperties properties, Parameters parameters) {
+    super(ids, listenerIds, properties, parameters);
   }
 
   @Override
diff --git 
a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
 
b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
index 16ccffd6..990b63d9 100644
--- 
a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ 
b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -35,9 +35,9 @@ public class MiniRaftClusterWithNetty extends 
MiniRaftCluster.RpcBase {
   public static final Factory<MiniRaftClusterWithNetty> FACTORY
       = new Factory<MiniRaftClusterWithNetty>() {
     @Override
-    public MiniRaftClusterWithNetty newCluster(String[] ids, RaftProperties 
prop) {
+    public MiniRaftClusterWithNetty newCluster(String[] ids, String[] 
listenerIds, RaftProperties prop) {
       RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.NETTY);
-      return new MiniRaftClusterWithNetty(ids, prop);
+      return new MiniRaftClusterWithNetty(ids, listenerIds, prop);
     }
   };
 
@@ -51,8 +51,8 @@ public class MiniRaftClusterWithNetty extends 
MiniRaftCluster.RpcBase {
   public static final DelayLocalExecutionInjection sendServerRequest
       = new DelayLocalExecutionInjection(NettyRpcService.SEND_SERVER_REQUEST);
 
-  protected MiniRaftClusterWithNetty(String[] ids, RaftProperties properties) {
-    super(ids, properties, null);
+  protected MiniRaftClusterWithNetty(String[] ids, String[] listenerIds, 
RaftProperties properties) {
+    super(ids, listenerIds, properties, null);
   }
 
   @Override
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 39683f6f..f5cd38b3 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -22,6 +22,7 @@ import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -106,18 +107,30 @@ public abstract class MiniRaftCluster implements 
Closeable {
       }
 
       default CLUSTER newCluster(int numPeers) {
-        return getFactory().newCluster(numPeers, getProperties());
+        return newCluster(numPeers, 0);
+      }
+
+      default CLUSTER newCluster(int numPeers, int numListeners) {
+        return getFactory().newCluster(numPeers, numListeners, 
getProperties());
       }
 
       default void runWithNewCluster(int numServers, CheckedConsumer<CLUSTER, 
Exception> testCase) throws Exception {
-        runWithNewCluster(numServers, true, testCase);
+        runWithNewCluster(numServers, 0, true, testCase);
       }
 
-      default void runWithNewCluster(int numServers, boolean startCluster, 
CheckedConsumer<CLUSTER, Exception> testCase)
+      default void runWithNewCluster(int numServers, boolean startCluster, 
CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
+        runWithNewCluster(numServers, 0, startCluster, testCase);
+      }
+
+      default void runWithNewCluster(int numServers, int numListeners, 
CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
+        runWithNewCluster(numServers, numListeners, true, testCase);
+      }
+
+      default void runWithNewCluster(int numServers, int numListeners, boolean 
startCluster, CheckedConsumer<CLUSTER, Exception> testCase)
           throws Exception {
         final StackTraceElement caller = 
JavaUtils.getCallerStackTraceElement();
         LOG.info("Running " + caller.getMethodName());
-        final CLUSTER cluster = newCluster(numServers);
+        final CLUSTER cluster = newCluster(numServers, numListeners);
         try {
           if (startCluster) {
             cluster.start();
@@ -133,11 +146,15 @@ public abstract class MiniRaftCluster implements 
Closeable {
       }
 
       default void runWithSameCluster(int numServers, CheckedConsumer<CLUSTER, 
Exception> testCase) throws Exception {
+        runWithSameCluster(numServers, 0, testCase);
+      }
+
+      default void runWithSameCluster(int numServers, int numListeners, 
CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
         final StackTraceElement caller = 
JavaUtils.getCallerStackTraceElement();
         LOG.info("Running " + caller.getMethodName());
         CLUSTER cluster = null;
         try {
-          cluster = getFactory().reuseCluster(numServers, getProperties());
+          cluster = getFactory().reuseCluster(numServers, numListeners, 
getProperties());
           testCase.accept(cluster);
         } catch(Exception t) {
           if (cluster != null) {
@@ -151,14 +168,14 @@ public abstract class MiniRaftCluster implements 
Closeable {
 
     private final AtomicReference<CLUSTER> reusableCluster = new 
AtomicReference<>();
 
-    private CLUSTER reuseCluster(int numServers, RaftProperties prop) throws 
IOException {
+    private CLUSTER reuseCluster(int numServers, int numListeners, 
RaftProperties prop) throws IOException {
       for(;;) {
         final CLUSTER cluster = reusableCluster.get();
         if (cluster != null) {
           return cluster;
         }
 
-        final CLUSTER newCluster = newCluster(numServers, prop);
+        final CLUSTER newCluster = newCluster(numServers, numListeners, prop);
         if (reusableCluster.compareAndSet(null, newCluster)) {
           newCluster.start();
           Runtime.getRuntime().addShutdownHook(new 
Thread(newCluster::shutdown));
@@ -168,16 +185,20 @@ public abstract class MiniRaftCluster implements 
Closeable {
     }
 
     public abstract CLUSTER newCluster(
-        String[] ids, RaftProperties prop);
+        String[] ids, String[] listenerIds, RaftProperties prop);
 
     public CLUSTER newCluster(int numServer, RaftProperties prop) {
-      return newCluster(generateIds(numServer, 0), prop);
+      return newCluster(numServer, 0, prop);
+    }
+
+    public CLUSTER newCluster(int numServer, int numListeners, RaftProperties 
prop) {
+      return newCluster(generateIds(numServer, 0), generateIds(numListeners, 
numServer), prop);
     }
   }
 
   public static abstract class RpcBase extends MiniRaftCluster {
-    public RpcBase(String[] ids, RaftProperties properties, Parameters 
parameters) {
-      super(ids, properties, parameters);
+    public RpcBase(String[] ids, String[] listenerIds, RaftProperties 
properties, Parameters parameters) {
+      super(ids, listenerIds, properties, parameters);
     }
 
     @Override
@@ -223,17 +244,27 @@ public abstract class MiniRaftCluster implements 
Closeable {
     }
   }
 
-  public static RaftGroup initRaftGroup(Collection<String> ids) {
-    Iterator<InetSocketAddress> addresses = 
NetUtils.createLocalServerAddress(4 * ids.size()).iterator();
-    final RaftPeer[] peers = ids.stream()
-        .map(RaftPeerId::valueOf)
-        .map(id -> RaftPeer.newBuilder().setId(id)
-            .setAddress(addresses.next())
-            .setAdminAddress(addresses.next())
-            .setClientAddress(addresses.next())
-            .setDataStreamAddress(addresses.next())
-            .build())
-        .toArray(RaftPeer[]::new);
+  public static RaftGroup initRaftGroup(Collection<String> ids, 
Collection<String> listenerIds) {
+    Iterator<InetSocketAddress> addresses = 
NetUtils.createLocalServerAddress(4 * (ids.size() + 
listenerIds.size())).iterator();
+    Stream<RaftPeer> peer = ids.stream()
+            .map(RaftPeerId::valueOf)
+            .map(id -> RaftPeer.newBuilder().setId(id)
+                .setAddress(addresses.next())
+                .setAdminAddress(addresses.next())
+                .setClientAddress(addresses.next())
+                .setDataStreamAddress(addresses.next())
+                .build());
+    Stream<RaftPeer> listener = listenerIds.stream()
+            .map(RaftPeerId::valueOf)
+            .map(id -> RaftPeer.newBuilder().setId(id)
+                .setAddress(addresses.next())
+                .setAdminAddress(addresses.next())
+                .setClientAddress(addresses.next())
+                .setDataStreamAddress(addresses.next())
+                .setStartupRole(RaftProtos.RaftPeerRole.LISTENER)
+                .build());
+    final RaftPeer[] peers = Stream.concat(peer, 
listener).toArray(RaftPeer[]::new);
+
     return RaftGroup.valueOf(RaftGroupId.randomId(), peers);
   }
 
@@ -267,8 +298,8 @@ public abstract class MiniRaftCluster implements Closeable {
 
   private final AtomicReference<Timer> timer = new AtomicReference<>();
 
-  protected MiniRaftCluster(String[] ids, RaftProperties properties, 
Parameters parameters) {
-    this.group = initRaftGroup(Arrays.asList(ids));
+  protected MiniRaftCluster(String[] ids, String[] listenerIds, RaftProperties 
properties, Parameters parameters) {
+    this.group = initRaftGroup(Arrays.asList(ids), Arrays.asList(listenerIds));
     LOG.info("new {} with {}", JavaUtils.getClassSimpleName(getClass()), 
group);
     this.properties = new RaftProperties(properties);
     this.parameters = parameters;
@@ -410,11 +441,22 @@ public abstract class MiniRaftCluster implements 
Closeable {
 
   public PeerChanges addNewPeers(int number, boolean startNewPeer,
       boolean emptyPeer) throws IOException {
-    return addNewPeers(generateIds(number, servers.size()), startNewPeer, 
emptyPeer);
+    return addNewPeers(generateIds(number, servers.size()), startNewPeer, 
emptyPeer,
+        RaftProtos.RaftPeerRole.FOLLOWER);
   }
 
   public PeerChanges addNewPeers(String[] ids, boolean startNewPeer,
       boolean emptyPeer) throws IOException {
+    return addNewPeers(ids, startNewPeer, emptyPeer, 
RaftProtos.RaftPeerRole.FOLLOWER);
+  }
+
+  public PeerChanges addNewPeers(int number, boolean startNewPeer,
+      boolean emptyPeer, RaftProtos.RaftPeerRole startRole) throws IOException 
{
+    return addNewPeers(generateIds(number, servers.size()), startNewPeer, 
emptyPeer, startRole);
+  }
+
+  public PeerChanges addNewPeers(String[] ids, boolean startNewPeer,
+      boolean emptyPeer, RaftProtos.RaftPeerRole startRole) throws IOException 
{
     LOG.info("Add new peers {}", Arrays.asList(ids));
 
     final Iterable<RaftPeerId> peerIds = 
CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf);
@@ -423,8 +465,9 @@ public abstract class MiniRaftCluster implements Closeable {
       raftGroup = RaftGroup.valueOf(group.getGroupId(), 
Collections.emptyList());
     } else {
       final Collection<RaftPeer> newPeers = 
StreamSupport.stream(peerIds.spliterator(), false)
-          .map(id -> RaftPeer.newBuilder().setId(id).build())
-          .collect(Collectors.toSet());
+          .map(id -> RaftPeer.newBuilder().setId(id)
+              .setStartupRole(startRole)
+              .build()).collect(Collectors.toSet());
       newPeers.addAll(group.getPeers());
       raftGroup = RaftGroup.valueOf(group.getGroupId(), newPeers);
     }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
index 907ea515..437da929 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
@@ -40,8 +40,8 @@ public class MiniRaftClusterWithSimulatedRpc extends 
MiniRaftCluster {
   public static final Factory<MiniRaftClusterWithSimulatedRpc> FACTORY
       = new Factory<MiniRaftClusterWithSimulatedRpc>() {
     @Override
-    public MiniRaftClusterWithSimulatedRpc newCluster(
-        String[] ids, RaftProperties prop) {
+    public MiniRaftClusterWithSimulatedRpc newCluster(String[] ids, String[] 
listenerIds,
+        RaftProperties prop) {
       RaftConfigKeys.Rpc.setType(prop, SimulatedRpc.INSTANCE);
       if (ThreadLocalRandom.current().nextBoolean()) {
         // turn off simulate latency half of the times.
@@ -54,7 +54,7 @@ public class MiniRaftClusterWithSimulatedRpc extends 
MiniRaftCluster {
           = new SimulatedRequestReply<>(simulateLatencyMs);
       final SimulatedClientRpc client2serverRequestReply
           = new SimulatedClientRpc(simulateLatencyMs);
-      return new MiniRaftClusterWithSimulatedRpc(ids, prop,
+      return new MiniRaftClusterWithSimulatedRpc(ids, listenerIds, prop,
           serverRequestReply, client2serverRequestReply);
     }
   };
@@ -70,10 +70,10 @@ public class MiniRaftClusterWithSimulatedRpc extends 
MiniRaftCluster {
   private final SimulatedClientRpc client2serverRequestReply;
 
   private MiniRaftClusterWithSimulatedRpc(
-      String[] ids, RaftProperties properties,
+      String[] ids, String[] listenerIds, RaftProperties properties,
       SimulatedRequestReply<RaftServerRequest, RaftServerReply> 
serverRequestReply,
       SimulatedClientRpc client2serverRequestReply) {
-    super(ids, properties,
+    super(ids, listenerIds, properties,
         SimulatedRpc.Factory.newRaftParameters(serverRequestReply, 
client2serverRequestReply));
     this.serverRequestReply = serverRequestReply;
     this.client2serverRequestReply = client2serverRequestReply;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
index 1a210d8a..3396ada9 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
@@ -40,10 +40,11 @@ public class 
MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty extends MiniRa
     }
 
     @Override
-    public MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty 
newCluster(String[] ids, RaftProperties prop) {
+    public MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty 
newCluster(String[] ids,
+        String[] listenerIds, RaftProperties prop) {
       RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
       RaftConfigKeys.DataStream.setType(prop, SupportedDataStreamType.NETTY);
-      return new MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(ids, 
prop, parameters);
+      return new MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(ids, 
listenerIds, prop, parameters);
     }
   }
 
@@ -56,9 +57,9 @@ public class 
MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty extends MiniRa
     }
   }
 
-  private MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(String[] ids, 
RaftProperties properties,
+  private MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(String[] ids, 
String[] listenerIds, RaftProperties properties,
       Parameters parameters) {
-    super(ids, properties, parameters);
+    super(ids, listenerIds, properties, parameters);
   }
 
   @Override
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
index dc3465cd..1e5149b4 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
@@ -34,10 +34,11 @@ public class 
MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty extends MiniR
   public static final 
Factory<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty> FACTORY
       = new Factory<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty>() {
     @Override
-    public MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty 
newCluster(String[] ids, RaftProperties prop) {
+    public MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty 
newCluster(String[] ids,
+        String[] listenerIds, RaftProperties prop) {
       RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.NETTY);
       RaftConfigKeys.DataStream.setType(prop, SupportedDataStreamType.NETTY);
-      return new MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(ids, 
prop);
+      return new MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(ids, 
listenerIds, prop);
     }
   };
 
@@ -48,8 +49,8 @@ public class 
MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty extends MiniR
     }
   }
 
-  private MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(String[] ids, 
RaftProperties properties) {
-    super(ids, properties);
+  private MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(String[] ids, 
String[] listenerIds, RaftProperties properties) {
+    super(ids, listenerIds, properties);
   }
 
   @Override

Reply via email to