This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit e80cac5991a8ac6a25705f1f4d046a12598e5e79 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Wed Jul 6 10:11:02 2022 -0700 RATIS-1610. NettyRpcService should not bind the port in the constructor. (#668) --- .../apache/ratis/netty/server/NettyRpcService.java | 14 +++++++---- .../apache/ratis/server/impl/MiniRaftCluster.java | 29 +++++++++++++++++----- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index 48a35921..1f546873 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -44,6 +44,7 @@ import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto; import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.ProtoUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +85,7 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy, private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); - private final ChannelFuture channelFuture; + private final MemoizedSupplier<ChannelFuture> channel; @ChannelHandler.Sharable class InboundHandler extends SimpleChannelInboundHandler<RaftNettyServerRequestProto> { @@ -116,12 +117,12 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy, }; final int port = NettyConfigKeys.Server.port(server.getProperties()); - channelFuture = new ServerBootstrap() + this.channel = JavaUtils.memoize(() -> new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(initializer) - .bind(port); + .bind(port)); } @Override @@ -130,13 +131,16 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy, } private Channel getChannel() { - return channelFuture.awaitUninterruptibly().channel(); + if (!channel.isInitialized()) { + throw new IllegalStateException(getId() + ": Failed to getChannel since the service is not yet started"); + } + return channel.get().awaitUninterruptibly().channel(); } @Override public void startImpl() throws IOException { try { - channelFuture.syncUninterruptibly(); + channel.get().syncUninterruptibly(); } catch(Exception t) { throw new IOException(getId() + ": Failed to start " + JavaUtils.getClassSimpleName(getClass()), t); } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 61db4936..d479ae6e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -194,12 +194,12 @@ public abstract class MiniRaftCluster implements Closeable { } protected String getAddress(RaftPeerId id, RaftGroup g, Function<RaftPeer, String> getAddress) { - final RaftPeer p = g != null? g.getPeer(id): peers.get(id); + final RaftPeer p = getPeer(id, g); return p == null? null : getAddress.apply(p); } protected int getDataStreamPort(RaftPeerId id, RaftGroup g) { - final RaftPeer p = g != null? g.getPeer(id): peers.get(id); + final RaftPeer p = getPeer(id, g); final String address = p == null? null : p.getDataStreamAddress(); return getPort(address); } @@ -292,15 +292,13 @@ public abstract class MiniRaftCluster implements Closeable { public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean format) { final RaftServerProxy s = newRaftServer(id, group, format); Preconditions.assertTrue(servers.put(id, s) == null); - peers.put(id, s.getPeer()); return s; } private Collection<RaftServer> putNewServers( Iterable<RaftPeerId> peers, boolean format, boolean emptyPeer) { if (emptyPeer) { - RaftGroup raftGroup = RaftGroup.valueOf(group.getGroupId(), - Collections.EMPTY_LIST); + final RaftGroup raftGroup = RaftGroup.valueOf(group.getGroupId(), Collections.emptyList()); return StreamSupport.stream(peers.spliterator(), false) .map(id -> putNewServer(id, raftGroup, format)) .collect(Collectors.toList()); @@ -338,6 +336,7 @@ public abstract class MiniRaftCluster implements Closeable { final RaftServer proxy = putNewServer(serverId, group, format); proxy.start(); + peers.put(proxy.getId(), proxy.getPeer()); return group == null? null: proxy.getDivision(group.getGroupId()); } @@ -447,9 +446,10 @@ public abstract class MiniRaftCluster implements Closeable { return new PeerChanges(p, np, RaftPeer.emptyArray()); } - static void startServers(Iterable<? extends RaftServer> servers) throws IOException { + void startServers(Iterable<? extends RaftServer> servers) throws IOException { for(RaftServer s : servers) { s.start(); + peers.put(s.getId(), s.getPeer()); } } @@ -672,6 +672,23 @@ public abstract class MiniRaftCluster implements Closeable { return toRaftPeers(getServers()); } + RaftPeer getPeer(RaftPeerId id, RaftGroup group) { + RaftPeer p = peers.get(id); + if (p != null) { + return p; + } + if (group != null) { + p = group.getPeer(id); + } + if (p == null) { + p = Optional.ofNullable(servers.get(id)).map(RaftServerProxy::getPeer).orElse(null); + } + if (p != null) { + peers.put(id, p); + } + return p; + } + public RaftGroup getGroup() { return group; }
