amaliujia commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508755380
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -65,23 +68,45 @@
private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams =
new ConcurrentHashMap<>();
private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput
= new ConcurrentHashMap<>();
- private List<DataStreamClient> clients = new ArrayList<>();
+ private final List<RaftPeer> peers = new CopyOnWriteArrayList<>();
+ private final PeerProxyMap<DataStreamClient> proxies;
- public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
- this.raftServer = server;
+ public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine,
RaftProperties properties) {
+ this.name = server + "-" + getClass().getSimpleName();
this.stateMachine = stateMachine;
- this.channelFuture = buildChannel();
+ this.channelFuture = new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(getInitializer())
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .localAddress(NetUtils.createSocketAddr(server.getAddress()))
+ .bind();
+
+ this.proxies = new PeerProxyMap<>(name, peer -> newClient(peer,
properties));
}
- public NettyServerStreamRpc(
- RaftPeer server, List<RaftPeer> otherPeers,
- StateMachine stateMachine, RaftProperties properties){
- this(server, stateMachine);
- setupClient(otherPeers, properties);
+ static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
+ final DataStreamClient client = DataStreamClient.newBuilder()
+ .setRaftServer(peer)
+ .setProperties(properties)
+ .build();
+ client.start();
+ return client;
}
- private List<DataStreamOutput> getDataStreamOutput() {
- return clients.stream().map(client ->
client.stream()).collect(Collectors.toList());
+ @Override
+ public void addPeers(Collection<RaftPeer> newPeers) {
+ proxies.addPeers(newPeers);
+ peers.addAll(newPeers);
+ }
+
+ private List<DataStreamOutput> getDataStreamOutput() throws IOException {
+ final List<DataStreamOutput> outs = new ArrayList<>();
+ for(RaftPeer peer : peers) {
+ outs.add(proxies.getProxy(peer.getId()).stream());
Review comment:
Thanks for the update. It looks more clear now how does clients and
stream output are created and failures are handled.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]