runzhiwang commented on a change in pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#discussion_r515819031
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void primaryServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ // if primary server is not the leader, primary ask all the other
peers to start transaction
+ askPeerStartTransaction(info, request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
+ private void askPeerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
+ private void peerServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.closeAsync());
+ }
+ } else {
+ // peer server start transaction
+ peerServerStartTransaction(streams.get(key), request, ctx);
+ return;
}
final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
final CompletableFuture<?> current = previous.get()
.thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ if (request.getType() == Type.STREAM_HEADER || request.getType() ==
Type.STREAM_DATA) {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ if (info.getDataStreamOutputs().size() > 0) {
Review comment:
@szetszwo Sorry. When info.getDataStreamOutputs().size() == 0, it means
this server is a peer, it only needs to response to primary server with the
close stream result. Maybe it need not to call primaryServerStartTransaction.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void primaryServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ // if primary server is not the leader, primary ask all the other
peers to start transaction
+ askPeerStartTransaction(info, request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
+ private void askPeerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
+ private void peerServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.closeAsync());
+ }
+ } else {
+ // peer server start transaction
+ peerServerStartTransaction(streams.get(key), request, ctx);
+ return;
}
final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
final CompletableFuture<?> current = previous.get()
.thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ if (request.getType() == Type.STREAM_HEADER || request.getType() ==
Type.STREAM_DATA) {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ if (info.getDataStreamOutputs().size() > 0) {
Review comment:
@szetszwo Thanks for reminding it, you are right, so I think we need to
mark which server is primary:
https://github.com/apache/incubator-ratis/pull/237/commits/7c578d754385734b98e56cdd65705c06ee381b10
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void primaryServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ // if primary server is not the leader, primary ask all the other
peers to start transaction
+ askPeerStartTransaction(info, request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
+ private void askPeerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
+ private void peerServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.closeAsync());
+ }
+ } else {
+ // peer server start transaction
+ peerServerStartTransaction(streams.get(key), request, ctx);
+ return;
}
final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
final CompletableFuture<?> current = previous.get()
.thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ if (request.getType() == Type.STREAM_HEADER || request.getType() ==
Type.STREAM_DATA) {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ if (info.getDataStreamOutputs().size() > 0) {
Review comment:
@szetszwo Thanks for reminding it, you are right, so I think we need to
mark which server is primary:
https://github.com/apache/incubator-ratis/pull/237/commits/7c578d754385734b98e56cdd65705c06ee381b10.
Otherwise, we can not distinguish the peer in a cluster with 3 servers and the
primary in a cluster with only one server.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void primaryServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ // if primary server is not the leader, primary ask all the other
peers to start transaction
+ askPeerStartTransaction(info, request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
+ private void askPeerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
+ private void peerServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.closeAsync());
+ }
+ } else {
+ // peer server start transaction
+ peerServerStartTransaction(streams.get(key), request, ctx);
+ return;
}
final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
final CompletableFuture<?> current = previous.get()
.thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ if (request.getType() == Type.STREAM_HEADER || request.getType() ==
Type.STREAM_DATA) {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ if (info.getDataStreamOutputs().size() > 0) {
Review comment:
@szetszwo Thanks for reminding it, you are right, so I think we need to
mark which server is primary:
https://github.com/apache/incubator-ratis/pull/237/commits/7c578d754385734b98e56cdd65705c06ee381b10.
Otherwise, we can not distinguish the peer in a cluster with 3 servers and the
primary in a cluster with only one server. Both of their's
info.getDataStreamOutputs().size() == 0
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +311,128 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void primaryServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ // if primary server is not the leader, primary ask all the other
peers to start transaction
+ askPeerStartTransaction(info, request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
+ private void askPeerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
+ private void peerServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.closeAsync());
+ }
+ } else {
+ // peer server start transaction
+ peerServerStartTransaction(streams.get(key), request, ctx);
+ return;
}
final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
final CompletableFuture<?> current = previous.get()
.thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null,
executorService)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ if (request.getType() == Type.STREAM_HEADER || request.getType() ==
Type.STREAM_DATA) {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ if (isPrimary()) {
+ // after all server close stream, primary server start
transaction
+ // TODO(runzhiwang): send start transaction to leader directly
+ primaryServerStartTransaction(info, request, ctx);
+ } else {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ }
Review comment:
@szetszwo Thanks the suggestions, I have updated the patch. But this
still can not address primary server won't call submitClientRequestAsync if
there are no other peers. The problem is that: 1. If peer finish close stream,
it must reply to primary success and can not call submitClientRequestAsync. 2.
If primary finish close stream and received the success reply of peers, it call
submitClientRequestAsync. So peer and primary's behavior when finish close
stream is different, we must decide whether current server is primary or peer,
but if a single server without other peers, we can not decide whether it is
primary or peer.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +311,128 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void primaryServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ // if primary server is not the leader, primary ask all the other
peers to start transaction
+ askPeerStartTransaction(info, request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
+ private void askPeerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
+ private void peerServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.closeAsync());
+ }
+ } else {
+ // peer server start transaction
+ peerServerStartTransaction(streams.get(key), request, ctx);
+ return;
}
final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
final CompletableFuture<?> current = previous.get()
.thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null,
executorService)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ if (request.getType() == Type.STREAM_HEADER || request.getType() ==
Type.STREAM_DATA) {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ if (isPrimary()) {
+ // after all server close stream, primary server start
transaction
+ // TODO(runzhiwang): send start transaction to leader directly
+ primaryServerStartTransaction(info, request, ctx);
+ } else {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ }
Review comment:
@szetszwo Thanks the suggestion. Update it.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void startTransaction(StreamInfo info, DataStreamRequestByteBuf
request, ChannelHandlerContext ctx) {
+ try {
+ server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply
-> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ if (request.getType() == Type.STREAM_CLOSE) {
+ // if this server is not the leader, forward start transition to
the other peers
+ forwardStartTransaction(info, request, ctx);
+ } else if (request.getType() == Type.START_TRANSACTION){
+ sendReplyNotSuccess(request, ctx);
+ } else {
+ LOG.error("{}: Unexpected type:{}", this, request.getType());
+ }
+ }
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ }
+
+ private void forwardStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.closeForwardAsync());
+ }
+ } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+ } else {
+ // peer server start transaction
+ startTransaction(streams.get(key), request, ctx);
Review comment:
@amaliujia we call startTransaction only when finish close stream, so
no need use previous.get(). Close stream split into two phase: close stream
and start transaction, server reply only when finish start transaction.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void startTransaction(StreamInfo info, DataStreamRequestByteBuf
request, ChannelHandlerContext ctx) {
+ try {
+ server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply
-> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ if (request.getType() == Type.STREAM_CLOSE) {
+ // if this server is not the leader, forward start transition to
the other peers
+ forwardStartTransaction(info, request, ctx);
Review comment:
@amaliujia Could not conclude that, not the leader is only one case,
there maybe other unexpected cause. But we should forwardStartTransaction
anyway.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void startTransaction(StreamInfo info, DataStreamRequestByteBuf
request, ChannelHandlerContext ctx) {
+ try {
+ server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply
-> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ if (request.getType() == Type.STREAM_CLOSE) {
+ // if this server is not the leader, forward start transition to
the other peers
+ forwardStartTransaction(info, request, ctx);
Review comment:
Let me update the comment
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void startTransaction(StreamInfo info, DataStreamRequestByteBuf
request, ChannelHandlerContext ctx) {
+ try {
+ server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply
-> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ if (request.getType() == Type.STREAM_CLOSE) {
+ // if this server is not the leader, forward start transition to
the other peers
+ forwardStartTransaction(info, request, ctx);
+ } else if (request.getType() == Type.START_TRANSACTION){
+ sendReplyNotSuccess(request, ctx);
+ } else {
+ LOG.error("{}: Unexpected type:{}", this, request.getType());
+ }
+ }
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ }
+
+ private void forwardStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.closeForwardAsync());
+ }
+ } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+ } else {
+ // peer server start transaction
+ startTransaction(streams.get(key), request, ctx);
Review comment:
@amaliujia No. Please see here:
client —(close-stream)—> primary server —(close-stream)—> other servers
primary server <—(ack-close-stream)— other servers
primary server —(start-transaction)—> other servers
client <—(RaftClientReply)— primary server <—(RaftClientReply)— leader
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void startTransaction(StreamInfo info, DataStreamRequestByteBuf
request, ChannelHandlerContext ctx) {
+ try {
+ server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply
-> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ if (request.getType() == Type.STREAM_CLOSE) {
Review comment:
Updated.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]