amaliujia commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508208171
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -65,23 +68,45 @@
private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams =
new ConcurrentHashMap<>();
private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput
= new ConcurrentHashMap<>();
- private List<DataStreamClient> clients = new ArrayList<>();
+ private final List<RaftPeer> peers = new CopyOnWriteArrayList<>();
+ private final PeerProxyMap<DataStreamClient> proxies;
- public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
- this.raftServer = server;
+ public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine,
RaftProperties properties) {
+ this.name = server + "-" + getClass().getSimpleName();
this.stateMachine = stateMachine;
- this.channelFuture = buildChannel();
+ this.channelFuture = new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(getInitializer())
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .localAddress(NetUtils.createSocketAddr(server.getAddress()))
+ .bind();
+
+ this.proxies = new PeerProxyMap<>(name, peer -> newClient(peer,
properties));
}
- public NettyServerStreamRpc(
- RaftPeer server, List<RaftPeer> otherPeers,
- StateMachine stateMachine, RaftProperties properties){
- this(server, stateMachine);
- setupClient(otherPeers, properties);
+ static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
+ final DataStreamClient client = DataStreamClient.newBuilder()
+ .setRaftServer(peer)
+ .setProperties(properties)
+ .build();
+ client.start();
+ return client;
}
- private List<DataStreamOutput> getDataStreamOutput() {
- return clients.stream().map(client ->
client.stream()).collect(Collectors.toList());
+ @Override
+ public void addPeers(Collection<RaftPeer> newPeers) {
+ proxies.addPeers(newPeers);
+ peers.addAll(newPeers);
+ }
+
+ private List<DataStreamOutput> getDataStreamOutput() throws IOException {
+ final List<DataStreamOutput> outs = new ArrayList<>();
+ for(RaftPeer peer : peers) {
+ outs.add(proxies.getProxy(peer.getId()).stream());
Review comment:
What if when there isn't client of a peer exist and this
`NettyServerStreamRpc` tries to create a new one by `newClient`, but still
fails to create this client? Will it throw a IOException?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]