This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit e80cac5991a8ac6a25705f1f4d046a12598e5e79
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Jul 6 10:11:02 2022 -0700

    RATIS-1610. NettyRpcService should not bind the port in the constructor. 
(#668)
---
 .../apache/ratis/netty/server/NettyRpcService.java | 14 +++++++----
 .../apache/ratis/server/impl/MiniRaftCluster.java  | 29 +++++++++++++++++-----
 2 files changed, 32 insertions(+), 11 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 48a35921..1f546873 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -44,6 +44,7 @@ import 
org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto;
 import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
 import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.ProtoUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,7 +85,7 @@ public final class NettyRpcService extends 
RaftServerRpcWithProxy<NettyRpcProxy,
 
   private final EventLoopGroup bossGroup = new NioEventLoopGroup();
   private final EventLoopGroup workerGroup = new NioEventLoopGroup();
-  private final ChannelFuture channelFuture;
+  private final MemoizedSupplier<ChannelFuture> channel;
 
   @ChannelHandler.Sharable
   class InboundHandler extends 
SimpleChannelInboundHandler<RaftNettyServerRequestProto> {
@@ -116,12 +117,12 @@ public final class NettyRpcService extends 
RaftServerRpcWithProxy<NettyRpcProxy,
     };
 
     final int port = NettyConfigKeys.Server.port(server.getProperties());
-    channelFuture = new ServerBootstrap()
+    this.channel = JavaUtils.memoize(() -> new ServerBootstrap()
         .group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(initializer)
-        .bind(port);
+        .bind(port));
   }
 
   @Override
@@ -130,13 +131,16 @@ public final class NettyRpcService extends 
RaftServerRpcWithProxy<NettyRpcProxy,
   }
 
   private Channel getChannel() {
-    return channelFuture.awaitUninterruptibly().channel();
+    if (!channel.isInitialized()) {
+      throw new IllegalStateException(getId() + ": Failed to getChannel since 
the service is not yet started");
+    }
+    return channel.get().awaitUninterruptibly().channel();
   }
 
   @Override
   public void startImpl() throws IOException {
     try {
-      channelFuture.syncUninterruptibly();
+      channel.get().syncUninterruptibly();
     } catch(Exception t) {
       throw new IOException(getId() + ": Failed to start " + 
JavaUtils.getClassSimpleName(getClass()), t);
     }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 61db4936..d479ae6e 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -194,12 +194,12 @@ public abstract class MiniRaftCluster implements 
Closeable {
     }
 
     protected String getAddress(RaftPeerId id, RaftGroup g, Function<RaftPeer, 
String> getAddress) {
-      final RaftPeer p = g != null? g.getPeer(id): peers.get(id);
+      final RaftPeer p = getPeer(id, g);
       return p == null? null : getAddress.apply(p);
     }
 
     protected int getDataStreamPort(RaftPeerId id, RaftGroup g) {
-      final RaftPeer p = g != null? g.getPeer(id): peers.get(id);
+      final RaftPeer p = getPeer(id, g);
       final String address = p == null? null : p.getDataStreamAddress();
       return getPort(address);
     }
@@ -292,15 +292,13 @@ public abstract class MiniRaftCluster implements 
Closeable {
   public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean 
format) {
     final RaftServerProxy s = newRaftServer(id, group, format);
     Preconditions.assertTrue(servers.put(id, s) == null);
-    peers.put(id, s.getPeer());
     return s;
   }
 
   private Collection<RaftServer> putNewServers(
       Iterable<RaftPeerId> peers, boolean format, boolean emptyPeer) {
     if (emptyPeer) {
-      RaftGroup raftGroup = RaftGroup.valueOf(group.getGroupId(),
-          Collections.EMPTY_LIST);
+      final RaftGroup raftGroup = RaftGroup.valueOf(group.getGroupId(), 
Collections.emptyList());
       return StreamSupport.stream(peers.spliterator(), false)
           .map(id -> putNewServer(id, raftGroup, format))
           .collect(Collectors.toList());
@@ -338,6 +336,7 @@ public abstract class MiniRaftCluster implements Closeable {
 
     final RaftServer proxy = putNewServer(serverId, group, format);
     proxy.start();
+    peers.put(proxy.getId(), proxy.getPeer());
     return group == null? null: proxy.getDivision(group.getGroupId());
   }
 
@@ -447,9 +446,10 @@ public abstract class MiniRaftCluster implements Closeable 
{
     return new PeerChanges(p, np, RaftPeer.emptyArray());
   }
 
-  static void startServers(Iterable<? extends RaftServer> servers) throws 
IOException {
+  void startServers(Iterable<? extends RaftServer> servers) throws IOException 
{
     for(RaftServer s : servers) {
       s.start();
+      peers.put(s.getId(), s.getPeer());
     }
   }
 
@@ -672,6 +672,23 @@ public abstract class MiniRaftCluster implements Closeable 
{
     return toRaftPeers(getServers());
   }
 
+  RaftPeer getPeer(RaftPeerId id, RaftGroup group) {
+    RaftPeer p = peers.get(id);
+    if (p != null) {
+      return p;
+    }
+    if (group != null) {
+      p = group.getPeer(id);
+    }
+    if (p == null) {
+      p = 
Optional.ofNullable(servers.get(id)).map(RaftServerProxy::getPeer).orElse(null);
+    }
+    if (p != null) {
+      peers.put(id, p);
+    }
+    return p;
+  }
+
   public RaftGroup getGroup() {
     return group;
   }

Reply via email to