szetszwo commented on code in PR #1469:
URL: https://github.com/apache/ratis/pull/1469#discussion_r3299691761
##########
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java:
##########
@@ -510,6 +577,42 @@ private void readImpl(DataStreamRequestByteBuf request,
ChannelHandlerContext ct
});
}
+ private static RaftClientRequest
toRaftClientRequest(DataStreamRequestByteBuf request) {
+ try {
+ return
ClientProtoUtils.toRaftClientRequest(RaftClientRequestProto.parseFrom(request.slice().nioBuffer()));
+ } catch (Throwable e) {
+ throw new CompletionException(e);
+ }
+ }
+
+ private CompletableFuture<Void>
submitReadOnlyRequest(DataStreamRequestByteBuf request,
+ RaftClientRequest raftClientRequest, ChannelHandlerContext ctx) {
+ try {
+ final StateMachine.ReadOnlyDataStream readOnlyDataStream = new
StateMachine.ReadOnlyDataStream() {
Review Comment:
We should reuse StateMachine.DataChannel which is a Java
WritableByteChannel. Then, we can use other Java API such as
FileChannel.transferTo (which is a highly efficient, zero-copy data transfer
operation).
```java
final StateMachine.DataChannel readOnlyDataStream = new
StateMachine.DataChannel() {
private long streamOffset;
private boolean closed = false;
@Override
public synchronized boolean isOpen() {
return !closed;
}
@Override
public synchronized void close() {
closed = true;
}
@Override
public synchronized void force(boolean metadata) throws
AlreadyClosedException{
if (!isOpen()) {
throw new AlreadyClosedException("Channel closed at offset " +
streamOffset);
}
ctx.flush();
}
@Override
public synchronized int write(ByteBuffer buffer) throws IOException {
if (!isOpen()) {
throw new AlreadyClosedException("Channel closed at offset " +
streamOffset);
}
final DataStreamReplyByteBuffer reply =
newDataStreamReadOnlyReplyByteBuffer(request, streamOffset, buffer);
final int length = buffer.remaining();
streamOffset += length;
final ChannelFuture future = ctx.write(reply);
try {
future.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
return length;
}
};
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]