runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507501923
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,55 +63,63 @@
private final StateMachine stateMachine;
private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams =
new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput
= new ConcurrentHashMap<>();
+
+ private List<DataStreamClient> clients = new ArrayList<>();
public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
this.raftServer = server;
this.stateMachine = stateMachine;
this.channelFuture = buildChannel();
}
- private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf,
AtomicBoolean released) {
+ public NettyServerStreamRpc(
+ RaftPeer server, List<RaftPeer> otherPeers,
+ StateMachine stateMachine, RaftProperties properties){
+ this(server, stateMachine);
+ setupClient(otherPeers, properties);
+ }
+
+ private List<DataStreamOutput> getDataStreamOutput() {
+ return clients.stream().map(client ->
client.stream()).collect(Collectors.toList());
+ }
+
+ private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
try {
final RaftClientRequest request =
ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
return stateMachine.data().stream(request);
} catch (InvalidProtocolBufferException e) {
throw new CompletionException(e);
- } finally {
- buf.release();
- released.set(true);
}
}
- private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
- if (released) {
+ private long writeTo(ByteBuf buf, DataStream stream) {
+ if (stream == null) {
return 0;
}
- try {
- if (stream == null) {
- return 0;
- }
- final WritableByteChannel channel = stream.getWritableByteChannel();
- long byteWritten = 0;
- for (ByteBuffer buffer : buf.nioBuffers()) {
- try {
- byteWritten += channel.write(buffer);
- } catch (Throwable t) {
- throw new CompletionException(t);
- }
+ final WritableByteChannel channel = stream.getWritableByteChannel();
+ long byteWritten = 0;
+ for (ByteBuffer buffer : buf.nioBuffers()) {
+ try {
+ byteWritten += channel.write(buffer);
+ } catch (Throwable t) {
+ throw new CompletionException(t);
}
- return byteWritten;
- } finally {
- buf.release();
}
+ return byteWritten;
}
- private void sendReply(DataStreamRequestByteBuf request, long byteWritten,
ChannelHandlerContext ctx) {
+ private void sendReply(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx, ByteBuf buf) {
// TODO RATIS-1098: include byteWritten and isSuccess in the reply
- final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
- request.getStreamId(), request.getStreamOffset(),
ByteBuffer.wrap("OK".getBytes()));
- ctx.writeAndFlush(reply);
+ try {
+ final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+ request.getStreamId(), request.getStreamOffset(),
ByteBuffer.wrap("OK".getBytes()));
+ ctx.writeAndFlush(reply);
+ } finally {
+ buf.release();
Review comment:
Okay, I see, maybe we can rename sendReply, because sendReply charges
of buf.release seems confused.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]