This is an automated email from the ASF dual-hosted git repository.

guohao1225 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 044b111d9 RATIS-2027. Ratis Streaming: Remote Stream copy data to 
heap. (#1044)
044b111d9 is described below

commit 044b111d991a619a7c2edcef235e28a7c623f090
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Feb 21 03:32:07 2024 -0800

    RATIS-2027. Ratis Streaming: Remote Stream copy data to heap. (#1044)
    
    * RATIS-2027. Ratis Streaming: Remote Stream copy data to heap.
---
 .../ratis/client/impl/DataStreamClientImpl.java       |  7 ++++++-
 .../apache/ratis/client/impl/OrderedStreamAsync.java  |  4 ++++
 .../datastream/impl}/DataStreamRequestByteBuf.java    |  3 +--
 .../org/apache/ratis/netty/NettyDataStreamUtils.java  | 16 +++++++++++++++-
 .../ratis/netty/client/NettyClientStreamRpc.java      | 12 ++++++++++++
 .../ratis/netty/server/DataStreamManagement.java      | 19 ++++++++++---------
 .../ratis/netty/server/NettyServerStreamRpc.java      | 12 +++++++-----
 7 files changed, 55 insertions(+), 18 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 26d01c356..ba91866d7 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -40,6 +40,7 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -169,6 +170,10 @@ public class DataStreamClientImpl implements 
DataStreamClient {
       return f;
     }
 
+    public CompletableFuture<DataStreamReply> writeAsync(ByteBuf src, 
Iterable<WriteOption> options) {
+      return writeAsyncImpl(src, src.readableBytes(), options);
+    }
+
     @Override
     public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, 
Iterable<WriteOption> options) {
       return writeAsyncImpl(src, src.remaining(), options);
@@ -235,7 +240,7 @@ public class DataStreamClientImpl implements 
DataStreamClient {
   }
 
   @Override
-  public DataStreamOutputRpc stream(RaftClientRequest request) {
+  public DataStreamOutputImpl stream(RaftClientRequest request) {
     return new DataStreamOutputImpl(request);
   }
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 989c00cbb..275755514 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -21,12 +21,14 @@ import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
 import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.SlidingWindow;
@@ -56,6 +58,8 @@ public class OrderedStreamAsync {
     DataStreamRequest getDataStreamRequest() {
       if (header.getDataLength() == 0) {
         return new DataStreamRequestByteBuffer(header, 
DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
+      } else if (data instanceof ByteBuf) {
+        return new DataStreamRequestByteBuf(header, (ByteBuf)data);
       } else if (data instanceof ByteBuffer) {
         return new DataStreamRequestByteBuffer(header, (ByteBuffer)data);
       } else if (data instanceof FilePositionCount) {
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
similarity index 96%
rename from 
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
rename to 
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
index 2542b1ec6..1873bec9b 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
@@ -16,9 +16,8 @@
  *  limitations under the License.
  */
 
-package org.apache.ratis.netty.server;
+package org.apache.ratis.datastream.impl;
 
-import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
 import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.ClientId;
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index bd03fefcc..aa46cba53 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -23,7 +23,7 @@ import 
org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
 import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.io.WriteOption;
-import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
 import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
@@ -128,6 +128,20 @@ public interface NettyDataStreamUtils {
     out.accept(Unpooled.wrappedBuffer(buffer));
   }
 
+  static void encodeDataStreamRequestByteBuf(DataStreamRequestByteBuf request, 
Consumer<Object> out,
+      ByteBufAllocator allocator) {
+    encodeDataStreamRequestHeader(request, out, allocator);
+    encodeByteBuf(request.slice(), out);
+  }
+
+  static void encodeByteBuf(ByteBuf buffer, Consumer<Object> out) {
+    if (buffer.readableBytes() == 0) {
+      out.accept(Unpooled.EMPTY_BUFFER); // to avoid EncoderException: must 
produce at least one message
+      return;
+    }
+    out.accept(buffer);
+  }
+
   static void encodeDataStreamRequestFilePositionCount(
       DataStreamRequestFilePositionCount request, Consumer<Object> out, 
ByteBufAllocator allocator) {
     encodeDataStreamRequestHeader(request, out, allocator);
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 020acc2fd..b2dc3812f 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -21,6 +21,7 @@ package org.apache.ratis.netty.client;
 import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
 import org.apache.ratis.io.StandardWriteOption;
@@ -370,6 +371,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
         p.addLast(ENCODER);
         p.addLast(ENCODER_FILE_POSITION_COUNT);
         p.addLast(ENCODER_BYTE_BUFFER);
+        p.addLast(ENCODER_BYTE_BUF);
         p.addLast(newDecoder());
         p.addLast(handler);
       }
@@ -386,6 +388,16 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     }
   }
 
+  static final MessageToMessageEncoder<DataStreamRequestByteBuf> 
ENCODER_BYTE_BUF = new EncoderByteBuf();
+
+  @ChannelHandler.Sharable
+  static class EncoderByteBuf extends 
MessageToMessageEncoder<DataStreamRequestByteBuf> {
+    @Override
+    protected void encode(ChannelHandlerContext context, 
DataStreamRequestByteBuf request, List<Object> out) {
+      NettyDataStreamUtils.encodeDataStreamRequestByteBuf(request, out::add, 
context.alloc());
+    }
+  }
+
   static final MessageToMessageEncoder<DataStreamRequestFilePositionCount> 
ENCODER_FILE_POSITION_COUNT
       = new EncoderFilePositionCount();
 
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 276a365ce..302aed998 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -18,10 +18,11 @@
 
 package org.apache.ratis.netty.server;
 
-import org.apache.ratis.client.DataStreamOutputRpc;
 import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.metrics.Timekeeper;
@@ -111,12 +112,12 @@ public class DataStreamManagement {
   }
 
   static class RemoteStream {
-    private final DataStreamOutputRpc out;
+    private final DataStreamOutputImpl out;
     private final AtomicReference<CompletableFuture<DataStreamReply>> 
sendFuture
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
     private final RequestMetrics metrics;
 
-    RemoteStream(DataStreamOutputRpc out, RequestMetrics metrics) {
+    RemoteStream(DataStreamOutputImpl out, RequestMetrics metrics) {
       this.metrics = metrics;
       this.out = out;
     }
@@ -132,7 +133,7 @@ public class DataStreamManagement {
     CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, 
Executor executor) {
       final Timekeeper.Context context = metrics.start();
       return composeAsync(sendFuture, executor,
-          n -> out.writeAsync(request.slice().nioBuffer(), 
addFlush(request.getWriteOptionList()))
+          n -> out.writeAsync(request.slice().retain(), 
addFlush(request.getWriteOptionList()))
               .whenComplete((l, e) -> metrics.stop(context, e == null)));
     }
   }
@@ -147,7 +148,7 @@ public class DataStreamManagement {
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
 
     StreamInfo(RaftClientRequest request, boolean primary, 
CompletableFuture<DataStream> stream, RaftServer server,
-        CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputRpc>, IOException> getStreams,
+        CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputImpl>, IOException> getStreams,
         Function<RequestType, RequestMetrics> metricsConstructor)
         throws IOException {
       this.request = request;
@@ -155,7 +156,7 @@ public class DataStreamManagement {
       this.local = new LocalStream(stream, 
metricsConstructor.apply(RequestType.LOCAL_WRITE));
       this.server = server;
       final Set<RaftPeer> successors = getSuccessors(server.getId());
-      final Set<DataStreamOutputRpc> outs = getStreams.apply(request, 
successors);
+      final Set<DataStreamOutputImpl> outs = getStreams.apply(request, 
successors);
       this.remotes = outs.stream()
           .map(o -> new RemoteStream(o, 
metricsConstructor.apply(RequestType.REMOTE_WRITE)))
           .collect(Collectors.toSet());
@@ -315,7 +316,7 @@ public class DataStreamManagement {
   }
 
   private StreamInfo newStreamInfo(ByteBuf buf,
-      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputRpc>, IOException> getStreams) {
+      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputImpl>, IOException> getStreams) {
     try {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
@@ -449,7 +450,7 @@ public class DataStreamManagement {
   }
 
   void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
-      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputRpc>, IOException> getStreams) {
+      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputImpl>, IOException> getStreams) {
     LOG.debug("{}: read {}", this, request);
     try {
       readImpl(request, ctx, getStreams);
@@ -459,7 +460,7 @@ public class DataStreamManagement {
   }
 
   private void readImpl(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx,
-      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputRpc>, IOException> getStreams) {
+      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputImpl>, IOException> getStreams) {
     final boolean close = 
request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
     ClientInvocationId key =  
ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
 
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index c5f24b058..451040bb6 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -20,9 +20,11 @@ package org.apache.ratis.netty.server;
 
 import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.netty.NettyUtils;
@@ -90,8 +92,8 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       map.addRaftPeers(newPeers);
     }
 
-    Set<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request, 
Set<RaftPeer> peers) throws IOException {
-      final Set<DataStreamOutputRpc> outs = new HashSet<>();
+    Set<DataStreamOutputImpl> getDataStreamOutput(RaftClientRequest request, 
Set<RaftPeer> peers) throws IOException {
+      final Set<DataStreamOutputImpl> outs = new HashSet<>();
       try {
         getDataStreamOutput(request, peers, outs);
       } catch (IOException e) {
@@ -101,11 +103,11 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       return outs;
     }
 
-    private void getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> 
peers, Set<DataStreamOutputRpc> outs)
+    private void getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> 
peers, Set<DataStreamOutputImpl> outs)
         throws IOException {
       for (RaftPeer peer : peers) {
         try {
-          outs.add((DataStreamOutputRpc) 
map.computeIfAbsent(peer).get().stream(request));
+          outs.add((DataStreamOutputImpl) 
map.computeIfAbsent(peer).get().stream(request));
         } catch (IOException e) {
           map.handleException(peer.getId(), e, true);
           throw new IOException(map.getName() + ": Failed to 
getDataStreamOutput for " + peer, e);
@@ -238,7 +240,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       }
 
       @Override
-      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      public void channelInactive(ChannelHandlerContext ctx) {
         requests.cleanUpOnChannelInactive(ctx.channel().id(), 
channelInactiveGracePeriod);
       }
 

Reply via email to