amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r501256086
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
private RandomAccessFile stream;
private FileChannel fileChannel;
private File file = new File("client-data-stream");
+ private List<DataStreamClientImpl> clients = new ArrayList<>();
+ private List<DataStreamOutput> streams = new ArrayList<>();
public NettyServerStreamRpc(RaftPeer server){
this.raftServer = server;
setupServer();
}
+ public NettyServerStreamRpc(
+ RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+ this.raftServer = server;
+ setupServer();
+ setupClient(otherPeers, properties);
+ }
+
private ChannelInboundHandler getServerHandler(){
return new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+ List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+ // forward requests to other stream servers.
+ for (DataStreamOutput streamOutput : streams) {
+ CompletableFuture<DataStreamReply> future =
+ streamOutput.streamAsync(req.getBuf().nioBuffer());
+ futures.add(future);
+ }
+
ByteBuffer[] bfs = req.getBuf().nioBuffers();
for(int i = 0; i < bfs.length; i++){
fileChannel.write(bfs[i]);
}
- req.getBuf().release();
- final DataStreamReply reply = new
DataStreamReplyByteBuffer(req.getStreamId(),
- req.getDataOffset(),
-
ByteBuffer.wrap("OK".getBytes()));
- ctx.writeAndFlush(reply);
+
+ try {
+ for (CompletableFuture<DataStreamReply> future : futures) {
+ future.join();
Review comment:
my current understanding is `ctx.writeAndFlush(reply);` will complete
the future on client side, so before run this code, we need to wait writes on
other peers to complete. That's why I was thinking we need some sort of
`future.join` before ``ctx.writeAndFlush(reply);` `
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
private RandomAccessFile stream;
private FileChannel fileChannel;
private File file = new File("client-data-stream");
+ private List<DataStreamClientImpl> clients = new ArrayList<>();
+ private List<DataStreamOutput> streams = new ArrayList<>();
public NettyServerStreamRpc(RaftPeer server){
this.raftServer = server;
setupServer();
}
+ public NettyServerStreamRpc(
+ RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+ this.raftServer = server;
+ setupServer();
+ setupClient(otherPeers, properties);
+ }
+
private ChannelInboundHandler getServerHandler(){
return new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+ List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+ // forward requests to other stream servers.
+ for (DataStreamOutput streamOutput : streams) {
+ CompletableFuture<DataStreamReply> future =
+ streamOutput.streamAsync(req.getBuf().nioBuffer());
+ futures.add(future);
+ }
+
ByteBuffer[] bfs = req.getBuf().nioBuffers();
for(int i = 0; i < bfs.length; i++){
fileChannel.write(bfs[i]);
}
- req.getBuf().release();
- final DataStreamReply reply = new
DataStreamReplyByteBuffer(req.getStreamId(),
- req.getDataOffset(),
-
ByteBuffer.wrap("OK".getBytes()));
- ctx.writeAndFlush(reply);
+
+ try {
+ for (CompletableFuture<DataStreamReply> future : futures) {
+ future.join();
Review comment:
my current understanding is `ctx.writeAndFlush(reply);` will complete
the future on client side, so before run this code, we need to wait writes on
other peers to complete. That's why I was thinking we need some sort of
`future.join` before `ctx.writeAndFlush(reply);`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]