szetszwo commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r499194565
##########
File path:
ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
##########
@@ -27,6 +27,11 @@
*/
void startServer();
+ /**
+ * start clients that used to forward requests to peers.
+ */
+ void startClientToPeers();
Review comment:
startServer should take care the initial setup. Would there be a case
that startServer will be called but not startClientToPeers?
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
private RandomAccessFile stream;
private FileChannel fileChannel;
private File file = new File("client-data-stream");
+ private List<DataStreamClientImpl> clients = new ArrayList<>();
+ private List<DataStreamOutput> streams = new ArrayList<>();
public NettyServerStreamRpc(RaftPeer server){
this.raftServer = server;
setupServer();
}
+ public NettyServerStreamRpc(
+ RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+ this.raftServer = server;
+ setupServer();
+ setupClient(otherPeers, properties);
+ }
+
private ChannelInboundHandler getServerHandler(){
return new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+ List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+ // forward requests to other stream servers.
+ for (DataStreamOutput streamOutput : streams) {
+ CompletableFuture<DataStreamReply> future =
+ streamOutput.streamAsync(req.getBuf().nioBuffer());
+ futures.add(future);
+ }
+
ByteBuffer[] bfs = req.getBuf().nioBuffers();
for(int i = 0; i < bfs.length; i++){
fileChannel.write(bfs[i]);
}
- req.getBuf().release();
- final DataStreamReply reply = new
DataStreamReplyByteBuffer(req.getStreamId(),
- req.getDataOffset(),
-
ByteBuffer.wrap("OK".getBytes()));
- ctx.writeAndFlush(reply);
+
+ try {
+ for (CompletableFuture<DataStreamReply> future : futures) {
+ future.join();
Review comment:
Read should not wait for the future, otherwise it will slow down read.
We should have another thread pool to wait, handle exceptions/retries and send
replies.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]