szetszwo commented on a change in pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#discussion_r515794132
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void primaryServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ // if primary server is not the leader, primary ask all the other
peers to start transaction
+ askPeerStartTransaction(info, request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
+ private void askPeerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
+ private void peerServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.closeAsync());
+ }
+ } else {
+ // peer server start transaction
+ peerServerStartTransaction(streams.get(key), request, ctx);
+ return;
}
final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
final CompletableFuture<?> current = previous.get()
.thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ if (request.getType() == Type.STREAM_HEADER || request.getType() ==
Type.STREAM_DATA) {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ if (info.getDataStreamOutputs().size() > 0) {
Review comment:
When info.getDataStreamOutputs().size() == 0, it should call
submitClientRequestAsync (i.e. primaryServerStartTransaction).
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void primaryServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ // if primary server is not the leader, primary ask all the other
peers to start transaction
+ askPeerStartTransaction(info, request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
+ private void askPeerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
+ private void peerServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
Review comment:
It should use the same executor as the write method.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void primaryServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
Review comment:
Let's wait for RATIS-1126 and pass an executor to thenApplyAsync.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void primaryServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ // if primary server is not the leader, primary ask all the other
peers to start transaction
+ askPeerStartTransaction(info, request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
+ private void askPeerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
+ private void peerServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.closeAsync());
+ }
+ } else {
+ // peer server start transaction
+ peerServerStartTransaction(streams.get(key), request, ctx);
+ return;
}
final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
final CompletableFuture<?> current = previous.get()
.thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ if (request.getType() == Type.STREAM_HEADER || request.getType() ==
Type.STREAM_DATA) {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ if (info.getDataStreamOutputs().size() > 0) {
Review comment:
How about the group only has one server? It seems that the primary
server won't call submitClientRequestAsync if there are no other peers.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +214,8 @@ StreamInfo get(Key key) {
private final ExecutorService executorService;
+ private boolean primary;
Review comment:
This won't work. A server may be a primary and non-primary at the same
time.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +311,128 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void primaryServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ // if primary server is not the leader, primary ask all the other
peers to start transaction
+ askPeerStartTransaction(info, request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
+ private void askPeerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
+ private void peerServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.closeAsync());
+ }
+ } else {
+ // peer server start transaction
+ peerServerStartTransaction(streams.get(key), request, ctx);
+ return;
}
final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
final CompletableFuture<?> current = previous.get()
.thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null,
executorService)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ if (request.getType() == Type.STREAM_HEADER || request.getType() ==
Type.STREAM_DATA) {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ if (isPrimary()) {
+ // after all server close stream, primary server start
transaction
+ // TODO(runzhiwang): send start transaction to leader directly
+ primaryServerStartTransaction(info, request, ctx);
+ } else {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ }
Review comment:
We may combine primaryServerStartTransaction and
peerServerStartTransaction to a single method.
```
private void startTransaction(StreamInfo info, DataStreamRequestByteBuf
request, ChannelHandlerContext ctx) {
try {
server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
if (reply.isSuccess()) {
ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
sendReplySuccess(request, buffer, -1, ctx);
} else if (info.getDataStreamOutputs().size() > 0) {
// if this server is not the leader, forward start transition to
the other peers
forwardStartTransaction(info, request, ctx);
} else {
sendReplyNotSuccess(request, ctx);
}
});
} catch (IOException e) {
sendReplyNotSuccess(request, ctx);
}
}
```
Then, we don't need markPrimary().
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +311,128 @@ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
};
}
+ private void primaryServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ // if primary server is not the leader, primary ask all the other
peers to start transaction
+ askPeerStartTransaction(info, request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
+ private void askPeerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
+ private void peerServerStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ info.getStream().thenApplyAsync(stream -> {
+ try {
+
server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return null;
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ info = streams.get(key);
+ localWrite = info.getStream().thenApplyAsync(stream -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ });
+
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.closeAsync());
+ }
+ } else {
+ // peer server start transaction
+ peerServerStartTransaction(streams.get(key), request, ctx);
+ return;
}
final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
final CompletableFuture<?> current = previous.get()
.thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null,
executorService)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ if (request.getType() == Type.STREAM_HEADER || request.getType() ==
Type.STREAM_DATA) {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ if (isPrimary()) {
+ // after all server close stream, primary server start
transaction
+ // TODO(runzhiwang): send start transaction to leader directly
+ primaryServerStartTransaction(info, request, ctx);
+ } else {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ }
Review comment:
I see. Then, we need to distinguish primary and other peers in the code.
We may change the DataStreamPacketHeaderProto.Type. We may split STREAM_CLOSE
into two types
- STREAM_CLOSE
- STREAM_CLOSE_FORWARDED
When the client sends to the primary, it uses STREAM_CLOSE. When the
primary sends to the other server, it uses STREAM_CLOSE_FORWARDED.
##########
File path: ratis-common/src/main/java/org/apache/ratis/io/CloseAsync.java
##########
@@ -22,9 +22,12 @@
/** Support the {@link CloseAsync#closeAsync()} method. */
public interface CloseAsync<REPLY> extends AutoCloseable {
- /** Close asynchronously. */
+ /** Primary close asynchronously. */
Review comment:
Please don't this javadoc. CloseAsync is general API for many
different classes.
##########
File path: ratis-common/src/main/java/org/apache/ratis/io/CloseAsync.java
##########
@@ -22,9 +22,12 @@
/** Support the {@link CloseAsync#closeAsync()} method. */
public interface CloseAsync<REPLY> extends AutoCloseable {
- /** Close asynchronously. */
+ /** Primary close asynchronously. */
CompletableFuture<REPLY> closeAsync();
+ /** Peer close asynchronously. */
+ CompletableFuture<REPLY> closeForwardAsync();
+
Review comment:
Please don't add it here since it is specific to Streaming.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]