This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ad3cd5a RATIS-1104.Should check "bytewritten" from remote writes to
peers to determine whether current stream write is successful (#230)
ad3cd5a is described below
commit ad3cd5ae6c731d2f797890614a4f7bbcaa3ae818
Author: Rui Wang <[email protected]>
AuthorDate: Thu Oct 22 00:41:20 2020 -0700
RATIS-1104.Should check "bytewritten" from remote writes to peers to
determine whether current stream write is successful (#230)
* RATIS-1104.Should check "bytewritten" from remote writes to peers to
determine whether current stream write is successful
* fixup! reorder imports
* fixup! address comments
* fixup! address comments
* fixup! fix style
---
.../ratis/netty/server/NettyServerStreamRpc.java | 45 +++++++++++++++++++---
1 file changed, 39 insertions(+), 6 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 59ec77b..54baea6 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -40,6 +40,7 @@ import
org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
+import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.PeerProxyMap;
@@ -58,6 +59,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class NettyServerStreamRpc implements DataStreamServerRpc {
@@ -176,21 +178,40 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
return byteWritten;
}
- private void sendReply(DataStreamRequestByteBuf request, long bytesWritten,
ChannelHandlerContext ctx) {
+ private void sendReplyNotSuccess(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx) {
+ final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+ request.getStreamId(), request.getStreamOffset(), null, -1, false);
+ ctx.writeAndFlush(reply);
+ }
+
+ private void sendReplySuccess(DataStreamRequestByteBuf request, long
bytesWritten, ChannelHandlerContext ctx) {
final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
request.getStreamId(), request.getStreamOffset(), null, bytesWritten,
true);
ctx.writeAndFlush(reply);
}
+ private void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
+ DataStreamRequestByteBuf request, long bytesWritten,
ChannelHandlerContext ctx) {
+ try {
+ if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
+ sendReplyNotSuccess(request, ctx);
+ } else {
+ sendReplySuccess(request, bytesWritten, ctx);
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ IOUtils.asIOException(e);
+ }
+ }
+
private ChannelInboundHandler getServerHandler(){
return new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
IOException {
- final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
+ final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)
msg;
final ByteBuf buf = request.slice();
final boolean isHeader = request.getStreamOffset() == -1;
- final CompletableFuture<Long> localWrite = isHeader?
+ final CompletableFuture<Long> localWrite = isHeader ?
streams.computeIfAbsent(request.getStreamId(), id ->
getDataStreamFuture(buf)).thenApply(stream -> 0L)
: streams.get(request.getStreamId()).thenApply(stream ->
writeTo(buf, stream));
@@ -199,25 +220,37 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
// do not need to forward header request
final List<DataStreamOutput> outs = proxies.getDataStreamOutput();
peersStreamOutput.put(request.getStreamId(), outs);
- for(DataStreamOutput out : outs) {
+ for (DataStreamOutput out : outs) {
remoteWrites.add(out.getHeaderFuture());
}
} else {
// body
- for(DataStreamOutput out :
peersStreamOutput.get(request.getStreamId())) {
+ for (DataStreamOutput out :
peersStreamOutput.get(request.getStreamId())) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
}
JavaUtils.allOf(remoteWrites).thenCombine(localWrite, (v,
bytesWritten) -> {
buf.release();
- sendReply(request, bytesWritten, ctx);
+ sendReply(remoteWrites, request, bytesWritten, ctx);
return null;
});
}
};
}
+ private boolean checkSuccessRemoteWrite(
+ List<CompletableFuture<DataStreamReply>> replyFutures, long
bytesWritten)
+ throws ExecutionException, InterruptedException {
+ for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
+ DataStreamReply reply = replyFuture.get();
+ if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private ChannelInitializer<SocketChannel> getInitializer(){
return new ChannelInitializer<SocketChannel>(){
@Override