szetszwo commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508239363
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -65,23 +68,45 @@
private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams =
new ConcurrentHashMap<>();
private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput
= new ConcurrentHashMap<>();
- private List<DataStreamClient> clients = new ArrayList<>();
+ private final List<RaftPeer> peers = new CopyOnWriteArrayList<>();
+ private final PeerProxyMap<DataStreamClient> proxies;
- public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
- this.raftServer = server;
+ public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine,
RaftProperties properties) {
+ this.name = server + "-" + getClass().getSimpleName();
this.stateMachine = stateMachine;
- this.channelFuture = buildChannel();
+ this.channelFuture = new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(getInitializer())
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .localAddress(NetUtils.createSocketAddr(server.getAddress()))
+ .bind();
+
+ this.proxies = new PeerProxyMap<>(name, peer -> newClient(peer,
properties));
}
- public NettyServerStreamRpc(
- RaftPeer server, List<RaftPeer> otherPeers,
- StateMachine stateMachine, RaftProperties properties){
- this(server, stateMachine);
- setupClient(otherPeers, properties);
+ static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
+ final DataStreamClient client = DataStreamClient.newBuilder()
+ .setRaftServer(peer)
+ .setProperties(properties)
+ .build();
+ client.start();
+ return client;
}
- private List<DataStreamOutput> getDataStreamOutput() {
- return clients.stream().map(client ->
client.stream()).collect(Collectors.toList());
+ @Override
+ public void addPeers(Collection<RaftPeer> newPeers) {
+ proxies.addPeers(newPeers);
+ peers.addAll(newPeers);
+ }
+
+ private List<DataStreamOutput> getDataStreamOutput() throws IOException {
+ final List<DataStreamOutput> outs = new ArrayList<>();
+ for(RaftPeer peer : peers) {
+ outs.add(proxies.getProxy(peer.getId()).stream());
Review comment:
peers and proxies must be consistent with each others. Let me refactor
the code to make it clear.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]