This is an automated email from the ASF dual-hosted git repository.
guohao1225 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 044b111d9 RATIS-2027. Ratis Streaming: Remote Stream copy data to
heap. (#1044)
044b111d9 is described below
commit 044b111d991a619a7c2edcef235e28a7c623f090
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Feb 21 03:32:07 2024 -0800
RATIS-2027. Ratis Streaming: Remote Stream copy data to heap. (#1044)
* RATIS-2027. Ratis Streaming: Remote Stream copy data to heap.
---
.../ratis/client/impl/DataStreamClientImpl.java | 7 ++++++-
.../apache/ratis/client/impl/OrderedStreamAsync.java | 4 ++++
.../datastream/impl}/DataStreamRequestByteBuf.java | 3 +--
.../org/apache/ratis/netty/NettyDataStreamUtils.java | 16 +++++++++++++++-
.../ratis/netty/client/NettyClientStreamRpc.java | 12 ++++++++++++
.../ratis/netty/server/DataStreamManagement.java | 19 ++++++++++---------
.../ratis/netty/server/NettyServerStreamRpc.java | 12 +++++++-----
7 files changed, 55 insertions(+), 18 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 26d01c356..ba91866d7 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -40,6 +40,7 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.protocol.*;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -169,6 +170,10 @@ public class DataStreamClientImpl implements
DataStreamClient {
return f;
}
+ public CompletableFuture<DataStreamReply> writeAsync(ByteBuf src,
Iterable<WriteOption> options) {
+ return writeAsyncImpl(src, src.readableBytes(), options);
+ }
+
@Override
public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src,
Iterable<WriteOption> options) {
return writeAsyncImpl(src, src.remaining(), options);
@@ -235,7 +240,7 @@ public class DataStreamClientImpl implements
DataStreamClient {
}
@Override
- public DataStreamOutputRpc stream(RaftClientRequest request) {
+ public DataStreamOutputImpl stream(RaftClientRequest request) {
return new DataStreamOutputImpl(request);
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 989c00cbb..275755514 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -21,12 +21,14 @@ import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SlidingWindow;
@@ -56,6 +58,8 @@ public class OrderedStreamAsync {
DataStreamRequest getDataStreamRequest() {
if (header.getDataLength() == 0) {
return new DataStreamRequestByteBuffer(header,
DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
+ } else if (data instanceof ByteBuf) {
+ return new DataStreamRequestByteBuf(header, (ByteBuf)data);
} else if (data instanceof ByteBuffer) {
return new DataStreamRequestByteBuffer(header, (ByteBuffer)data);
} else if (data instanceof FilePositionCount) {
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
similarity index 96%
rename from
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
rename to
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
index 2542b1ec6..1873bec9b 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.ratis.netty.server;
+package org.apache.ratis.datastream.impl;
-import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.ClientId;
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index bd03fefcc..aa46cba53 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -23,7 +23,7 @@ import
org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
-import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
@@ -128,6 +128,20 @@ public interface NettyDataStreamUtils {
out.accept(Unpooled.wrappedBuffer(buffer));
}
+ static void encodeDataStreamRequestByteBuf(DataStreamRequestByteBuf request,
Consumer<Object> out,
+ ByteBufAllocator allocator) {
+ encodeDataStreamRequestHeader(request, out, allocator);
+ encodeByteBuf(request.slice(), out);
+ }
+
+ static void encodeByteBuf(ByteBuf buffer, Consumer<Object> out) {
+ if (buffer.readableBytes() == 0) {
+ out.accept(Unpooled.EMPTY_BUFFER); // to avoid EncoderException: must
produce at least one message
+ return;
+ }
+ out.accept(buffer);
+ }
+
static void encodeDataStreamRequestFilePositionCount(
DataStreamRequestFilePositionCount request, Consumer<Object> out,
ByteBufAllocator allocator) {
encodeDataStreamRequestHeader(request, out, allocator);
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 020acc2fd..b2dc3812f 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -21,6 +21,7 @@ package org.apache.ratis.netty.client;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
@@ -370,6 +371,7 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
p.addLast(ENCODER);
p.addLast(ENCODER_FILE_POSITION_COUNT);
p.addLast(ENCODER_BYTE_BUFFER);
+ p.addLast(ENCODER_BYTE_BUF);
p.addLast(newDecoder());
p.addLast(handler);
}
@@ -386,6 +388,16 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
}
}
+ static final MessageToMessageEncoder<DataStreamRequestByteBuf>
ENCODER_BYTE_BUF = new EncoderByteBuf();
+
+ @ChannelHandler.Sharable
+ static class EncoderByteBuf extends
MessageToMessageEncoder<DataStreamRequestByteBuf> {
+ @Override
+ protected void encode(ChannelHandlerContext context,
DataStreamRequestByteBuf request, List<Object> out) {
+ NettyDataStreamUtils.encodeDataStreamRequestByteBuf(request, out::add,
context.alloc());
+ }
+ }
+
static final MessageToMessageEncoder<DataStreamRequestFilePositionCount>
ENCODER_FILE_POSITION_COUNT
= new EncoderFilePositionCount();
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 276a365ce..302aed998 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -18,10 +18,11 @@
package org.apache.ratis.netty.server;
-import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.metrics.Timekeeper;
@@ -111,12 +112,12 @@ public class DataStreamManagement {
}
static class RemoteStream {
- private final DataStreamOutputRpc out;
+ private final DataStreamOutputImpl out;
private final AtomicReference<CompletableFuture<DataStreamReply>>
sendFuture
= new AtomicReference<>(CompletableFuture.completedFuture(null));
private final RequestMetrics metrics;
- RemoteStream(DataStreamOutputRpc out, RequestMetrics metrics) {
+ RemoteStream(DataStreamOutputImpl out, RequestMetrics metrics) {
this.metrics = metrics;
this.out = out;
}
@@ -132,7 +133,7 @@ public class DataStreamManagement {
CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request,
Executor executor) {
final Timekeeper.Context context = metrics.start();
return composeAsync(sendFuture, executor,
- n -> out.writeAsync(request.slice().nioBuffer(),
addFlush(request.getWriteOptionList()))
+ n -> out.writeAsync(request.slice().retain(),
addFlush(request.getWriteOptionList()))
.whenComplete((l, e) -> metrics.stop(context, e == null)));
}
}
@@ -147,7 +148,7 @@ public class DataStreamManagement {
= new AtomicReference<>(CompletableFuture.completedFuture(null));
StreamInfo(RaftClientRequest request, boolean primary,
CompletableFuture<DataStream> stream, RaftServer server,
- CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputRpc>, IOException> getStreams,
+ CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputImpl>, IOException> getStreams,
Function<RequestType, RequestMetrics> metricsConstructor)
throws IOException {
this.request = request;
@@ -155,7 +156,7 @@ public class DataStreamManagement {
this.local = new LocalStream(stream,
metricsConstructor.apply(RequestType.LOCAL_WRITE));
this.server = server;
final Set<RaftPeer> successors = getSuccessors(server.getId());
- final Set<DataStreamOutputRpc> outs = getStreams.apply(request,
successors);
+ final Set<DataStreamOutputImpl> outs = getStreams.apply(request,
successors);
this.remotes = outs.stream()
.map(o -> new RemoteStream(o,
metricsConstructor.apply(RequestType.REMOTE_WRITE)))
.collect(Collectors.toSet());
@@ -315,7 +316,7 @@ public class DataStreamManagement {
}
private StreamInfo newStreamInfo(ByteBuf buf,
- CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputRpc>, IOException> getStreams) {
+ CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputImpl>, IOException> getStreams) {
try {
final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
@@ -449,7 +450,7 @@ public class DataStreamManagement {
}
void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
- CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputRpc>, IOException> getStreams) {
+ CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputImpl>, IOException> getStreams) {
LOG.debug("{}: read {}", this, request);
try {
readImpl(request, ctx, getStreams);
@@ -459,7 +460,7 @@ public class DataStreamManagement {
}
private void readImpl(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx,
- CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputRpc>, IOException> getStreams) {
+ CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputImpl>, IOException> getStreams) {
final boolean close =
request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
ClientInvocationId key =
ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index c5f24b058..451040bb6 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -20,9 +20,11 @@ package org.apache.ratis.netty.server;
import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.NettyUtils;
@@ -90,8 +92,8 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
map.addRaftPeers(newPeers);
}
- Set<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request,
Set<RaftPeer> peers) throws IOException {
- final Set<DataStreamOutputRpc> outs = new HashSet<>();
+ Set<DataStreamOutputImpl> getDataStreamOutput(RaftClientRequest request,
Set<RaftPeer> peers) throws IOException {
+ final Set<DataStreamOutputImpl> outs = new HashSet<>();
try {
getDataStreamOutput(request, peers, outs);
} catch (IOException e) {
@@ -101,11 +103,11 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
return outs;
}
- private void getDataStreamOutput(RaftClientRequest request, Set<RaftPeer>
peers, Set<DataStreamOutputRpc> outs)
+ private void getDataStreamOutput(RaftClientRequest request, Set<RaftPeer>
peers, Set<DataStreamOutputImpl> outs)
throws IOException {
for (RaftPeer peer : peers) {
try {
- outs.add((DataStreamOutputRpc)
map.computeIfAbsent(peer).get().stream(request));
+ outs.add((DataStreamOutputImpl)
map.computeIfAbsent(peer).get().stream(request));
} catch (IOException e) {
map.handleException(peer.getId(), e, true);
throw new IOException(map.getName() + ": Failed to
getDataStreamOutput for " + peer, e);
@@ -238,7 +240,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ public void channelInactive(ChannelHandlerContext ctx) {
requests.cleanUpOnChannelInactive(ctx.channel().id(),
channelInactiveGracePeriod);
}