szetszwo commented on code in PR #1469:
URL: https://github.com/apache/ratis/pull/1469#discussion_r3299691761


##########
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java:
##########
@@ -510,6 +577,42 @@ private void readImpl(DataStreamRequestByteBuf request, 
ChannelHandlerContext ct
     });
   }
 
+  private static RaftClientRequest 
toRaftClientRequest(DataStreamRequestByteBuf request) {
+    try {
+      return 
ClientProtoUtils.toRaftClientRequest(RaftClientRequestProto.parseFrom(request.slice().nioBuffer()));
+    } catch (Throwable e) {
+      throw new CompletionException(e);
+    }
+  }
+
+  private CompletableFuture<Void> 
submitReadOnlyRequest(DataStreamRequestByteBuf request,
+      RaftClientRequest raftClientRequest, ChannelHandlerContext ctx) {
+    try {
+      final StateMachine.ReadOnlyDataStream readOnlyDataStream = new 
StateMachine.ReadOnlyDataStream() {

Review Comment:
   We should reuse StateMachine.DataChannel which is a Java 
WritableByteChannel.  Then, we can use other Java API such as 
FileChannel.transferTo (which is a highly efficient, zero-copy data transfer 
operation).
   
   ```java
         final StateMachine.DataChannel readOnlyDataStream = new 
StateMachine.DataChannel() {
           private long streamOffset;
           private boolean closed = false;
   
           @Override
           public synchronized boolean isOpen() {
             return !closed;
           }
   
           @Override
           public synchronized void close() {
             closed = true;
           }
   
           @Override
           public synchronized void force(boolean metadata) throws 
AlreadyClosedException{
             if (!isOpen()) {
               throw new AlreadyClosedException("Channel closed at offset " + 
streamOffset);
             }
             ctx.flush();
           }
   
           @Override
           public synchronized int write(ByteBuffer buffer) throws IOException {
             if (!isOpen()) {
               throw new AlreadyClosedException("Channel closed at offset " + 
streamOffset);
             }
   
             final DataStreamReplyByteBuffer reply = 
newDataStreamReadOnlyReplyByteBuffer(request, streamOffset, buffer);
             final int length = buffer.remaining();
             streamOffset += length;
             final ChannelFuture future = ctx.write(reply);
             try {
               future.await();
             } catch (InterruptedException e) {
               throw new InterruptedIOException();
             }
             return length;
           }
         };
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to