This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c320f93c RATIS-1610. NettyRpcService should not bind the port in the
constructor. (#668)
c320f93c is described below
commit c320f93c5a510f07933536e28e507ef9c3ba3531
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Jul 6 10:11:02 2022 -0700
RATIS-1610. NettyRpcService should not bind the port in the constructor.
(#668)
---
.../apache/ratis/netty/server/NettyRpcService.java | 14 +++++++----
.../apache/ratis/server/impl/MiniRaftCluster.java | 29 +++++++++++++++++-----
2 files changed, 32 insertions(+), 11 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 48a35921..1f546873 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -44,6 +44,7 @@ import
org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto;
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.ProtoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +85,7 @@ public final class NettyRpcService extends
RaftServerRpcWithProxy<NettyRpcProxy,
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
- private final ChannelFuture channelFuture;
+ private final MemoizedSupplier<ChannelFuture> channel;
@ChannelHandler.Sharable
class InboundHandler extends
SimpleChannelInboundHandler<RaftNettyServerRequestProto> {
@@ -116,12 +117,12 @@ public final class NettyRpcService extends
RaftServerRpcWithProxy<NettyRpcProxy,
};
final int port = NettyConfigKeys.Server.port(server.getProperties());
- channelFuture = new ServerBootstrap()
+ this.channel = JavaUtils.memoize(() -> new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(initializer)
- .bind(port);
+ .bind(port));
}
@Override
@@ -130,13 +131,16 @@ public final class NettyRpcService extends
RaftServerRpcWithProxy<NettyRpcProxy,
}
private Channel getChannel() {
- return channelFuture.awaitUninterruptibly().channel();
+ if (!channel.isInitialized()) {
+ throw new IllegalStateException(getId() + ": Failed to getChannel since
the service is not yet started");
+ }
+ return channel.get().awaitUninterruptibly().channel();
}
@Override
public void startImpl() throws IOException {
try {
- channelFuture.syncUninterruptibly();
+ channel.get().syncUninterruptibly();
} catch(Exception t) {
throw new IOException(getId() + ": Failed to start " +
JavaUtils.getClassSimpleName(getClass()), t);
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 61db4936..d479ae6e 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -194,12 +194,12 @@ public abstract class MiniRaftCluster implements
Closeable {
}
protected String getAddress(RaftPeerId id, RaftGroup g, Function<RaftPeer,
String> getAddress) {
- final RaftPeer p = g != null? g.getPeer(id): peers.get(id);
+ final RaftPeer p = getPeer(id, g);
return p == null? null : getAddress.apply(p);
}
protected int getDataStreamPort(RaftPeerId id, RaftGroup g) {
- final RaftPeer p = g != null? g.getPeer(id): peers.get(id);
+ final RaftPeer p = getPeer(id, g);
final String address = p == null? null : p.getDataStreamAddress();
return getPort(address);
}
@@ -292,15 +292,13 @@ public abstract class MiniRaftCluster implements
Closeable {
public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean
format) {
final RaftServerProxy s = newRaftServer(id, group, format);
Preconditions.assertTrue(servers.put(id, s) == null);
- peers.put(id, s.getPeer());
return s;
}
private Collection<RaftServer> putNewServers(
Iterable<RaftPeerId> peers, boolean format, boolean emptyPeer) {
if (emptyPeer) {
- RaftGroup raftGroup = RaftGroup.valueOf(group.getGroupId(),
- Collections.EMPTY_LIST);
+ final RaftGroup raftGroup = RaftGroup.valueOf(group.getGroupId(),
Collections.emptyList());
return StreamSupport.stream(peers.spliterator(), false)
.map(id -> putNewServer(id, raftGroup, format))
.collect(Collectors.toList());
@@ -338,6 +336,7 @@ public abstract class MiniRaftCluster implements Closeable {
final RaftServer proxy = putNewServer(serverId, group, format);
proxy.start();
+ peers.put(proxy.getId(), proxy.getPeer());
return group == null? null: proxy.getDivision(group.getGroupId());
}
@@ -447,9 +446,10 @@ public abstract class MiniRaftCluster implements Closeable
{
return new PeerChanges(p, np, RaftPeer.emptyArray());
}
- static void startServers(Iterable<? extends RaftServer> servers) throws
IOException {
+ void startServers(Iterable<? extends RaftServer> servers) throws IOException
{
for(RaftServer s : servers) {
s.start();
+ peers.put(s.getId(), s.getPeer());
}
}
@@ -672,6 +672,23 @@ public abstract class MiniRaftCluster implements Closeable
{
return toRaftPeers(getServers());
}
+ RaftPeer getPeer(RaftPeerId id, RaftGroup group) {
+ RaftPeer p = peers.get(id);
+ if (p != null) {
+ return p;
+ }
+ if (group != null) {
+ p = group.getPeer(id);
+ }
+ if (p == null) {
+ p =
Optional.ofNullable(servers.get(id)).map(RaftServerProxy::getPeer).orElse(null);
+ }
+ if (p != null) {
+ peers.put(id, p);
+ }
+ return p;
+ }
+
public RaftGroup getGroup() {
return group;
}