This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 074a495 RATIS-1277. Fix FileStore write failed because out of order
(#389)
074a495 is described below
commit 074a495eda03392ad7a02d0ca799f96843fae5f4
Author: runzhiwang <[email protected]>
AuthorDate: Thu Dec 31 18:41:05 2020 +0800
RATIS-1277. Fix FileStore write failed because out of order (#389)
---
.../apache/ratis/netty/client/NettyClientStreamRpc.java | 10 +++++++---
.../apache/ratis/netty/server/DataStreamManagement.java | 15 +++++++++++++--
2 files changed, 20 insertions(+), 5 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 0c369f5..551810f 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.netty.NettyDataStreamUtils;
+import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.RaftPeer;
@@ -54,7 +55,8 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
private final String name;
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final Supplier<Channel> channel;
- private final ConcurrentMap<Long, Queue<CompletableFuture<DataStreamReply>>>
replies = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ClientInvocationId,
Queue<CompletableFuture<DataStreamReply>>> replies =
+ new ConcurrentHashMap<>();
public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
@@ -82,7 +84,8 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
}
final DataStreamReply reply = (DataStreamReply) msg;
LOG.debug("{}: read {}", this, reply);
- Optional.ofNullable(replies.get(reply.getStreamId()))
+ ClientInvocationId clientInvocationId =
ClientInvocationId.valueOf(reply.getClientId(), reply.getStreamId());
+ Optional.ofNullable(replies.get(clientInvocationId))
.map(Queue::poll)
.ifPresent(f -> f.complete(reply));
}
@@ -136,8 +139,9 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
@Override
public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest
request) {
final CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
+ ClientInvocationId clientInvocationId =
ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
final Queue<CompletableFuture<DataStreamReply>> q =
replies.computeIfAbsent(
- request.getStreamId(), key -> new ConcurrentLinkedQueue<>());
+ clientInvocationId, key -> new ConcurrentLinkedQueue<>());
if (!q.offer(f)) {
f.completeExceptionally(new IllegalStateException(this + ": Failed to
offer a future for " + request));
return f;
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 43a6096..d30166a 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -48,6 +48,7 @@ import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -284,7 +285,7 @@ public class DataStreamManagement {
static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
DataStreamRequestByteBuf request, long bytesWritten,
ChannelHandlerContext ctx) {
- final boolean success = checkSuccessRemoteWrite(remoteWrites,
bytesWritten);
+ final boolean success = checkSuccessRemoteWrite(remoteWrites,
bytesWritten, request);
final DataStreamReplyByteBuffer.Builder builder =
DataStreamReplyByteBuffer.newBuilder()
.setDataStreamPacket(request)
.setSuccess(success);
@@ -398,9 +399,19 @@ public class DataStreamManagement {
});
}
- static boolean
checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures,
long bytesWritten) {
+ static void assertReplyCorrespondingToRequest(
+ final DataStreamRequestByteBuf request, final DataStreamReply reply) {
+
Preconditions.assertTrue(request.getClientId().equals(reply.getClientId()));
+ Preconditions.assertTrue(request.getType() == reply.getType());
+ Preconditions.assertTrue(request.getStreamId() == reply.getStreamId());
+ Preconditions.assertTrue(request.getStreamOffset() ==
reply.getStreamOffset());
+ }
+
+ static boolean
checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures,
long bytesWritten,
+ final DataStreamRequestByteBuf request) {
for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
final DataStreamReply reply = replyFuture.join();
+ assertReplyCorrespondingToRequest(request, reply);
if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
return false;
}