This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f91ef63 RATIS-1143. Return exception of submitClientRequestAsync to
client (#270)
f91ef63 is described below
commit f91ef63809b21ed65445f4a60d8005ce0a1af8ca
Author: runzhiwang <[email protected]>
AuthorDate: Thu Nov 12 14:00:14 2020 +0800
RATIS-1143. Return exception of submitClientRequestAsync to client (#270)
---
.../ratis/netty/server/NettyServerStreamRpc.java | 67 +++++++++++++++++-----
.../ratis/datastream/DataStreamBaseTest.java | 10 +++-
.../ratis/datastream/TestDataStreamNetty.java | 34 ++++++++---
3 files changed, 86 insertions(+), 25 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 14411ab..d7d3f7c 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -26,10 +26,12 @@ import
org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.CloseAsync;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
+import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
@@ -37,6 +39,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
+import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.*;
@@ -73,6 +76,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class NettyServerStreamRpc implements DataStreamServerRpc {
public static final Logger LOG =
LoggerFactory.getLogger(NettyServerStreamRpc.class);
@@ -155,17 +159,15 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
return out.writeAsync(request.slice().nioBuffer());
}
- CompletableFuture<Boolean> startTransaction(DataStreamRequestByteBuf
request, ChannelHandlerContext ctx,
+ CompletableFuture<DataStreamReply>
startTransaction(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
Executor executor) {
return out.startTransactionAsync().thenApplyAsync(reply -> {
if (reply.isSuccess()) {
final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
((DataStreamReplyByteBuffer)reply).slice(): null;
sendReplySuccess(request, buffer, -1, ctx);
- return true;
- } else {
- return false;
}
+ return reply;
}, executor);
}
@@ -347,9 +349,9 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
}
- private void sendReplyNotSuccess(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx) {
+ private void sendReplyNotSuccess(DataStreamRequestByteBuf request,
ByteBuffer buffer, ChannelHandlerContext ctx) {
final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
- request.getStreamId(), request.getStreamOffset(), null, -1, false,
request.getType());
+ request.getStreamId(), request.getStreamOffset(), buffer, -1, false,
request.getType());
ctx.writeAndFlush(reply);
}
@@ -363,7 +365,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
DataStreamRequestByteBuf request, long bytesWritten,
ChannelHandlerContext ctx) {
if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
- sendReplyNotSuccess(request, ctx);
+ sendReplyNotSuccess(request, null, ctx);
} else {
sendReplySuccess(request, null, bytesWritten, ctx);
}
@@ -388,31 +390,68 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
} else if (request.getType() == Type.STREAM_CLOSE) {
// if this server is not the leader, forward start transition to the
other peers
// there maybe other unexpected reason cause failure except not
leader, forwardStartTransaction anyway
- forwardStartTransaction(info, request, ctx);
+ forwardStartTransaction(info, request, ctx, reply);
} else if (request.getType() == Type.START_TRANSACTION){
- sendReplyNotSuccess(request, ctx);
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplyNotSuccess(request, buffer, ctx);
} else {
throw new IllegalStateException(this + ": Unexpected type " +
request.getType() + ", request=" + request);
}
}, executor);
} catch (IOException e) {
- sendReplyNotSuccess(request, ctx);
+ sendReplyNotSuccess(request, null, ctx);
return CompletableFuture.completedFuture(null);
}
}
+ private void sendLeaderFailedReply(final
List<CompletableFuture<DataStreamReply>> results,
+ final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx,
RaftClientReply localReply) {
+ // get replies from the results, ignored exceptional replies
+ final Stream<RaftClientReply> remoteReplies = results.stream()
+ .filter(r -> !r.isCompletedExceptionally())
+ .map(CompletableFuture::join)
+ .map(this::getRaftClientReply);
+
+ // choose the leader's reply if there is any. Otherwise, use the local
reply
+ final RaftClientReply chosen = Stream.concat(Stream.of(localReply),
remoteReplies)
+ .filter(reply -> reply.getNotLeaderException() == null)
+ .findAny().orElse(localReply);
+
+ // send reply
+ final ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(chosen).toByteString().asReadOnlyByteBuffer();
+ sendReplyNotSuccess(request, buffer, ctx);
+ }
+
private void forwardStartTransaction(
- final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
- final List<CompletableFuture<Boolean>> results = info.applyToRemotes(
+ final StreamInfo info, final DataStreamRequestByteBuf request,
+ final ChannelHandlerContext ctx, RaftClientReply localReply) {
+ final List<CompletableFuture<DataStreamReply>> results =
info.applyToRemotes(
out -> out.startTransaction(request, ctx, executor));
JavaUtils.allOf(results).thenAccept(v -> {
- if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
- sendReplyNotSuccess(request, ctx);
+ for (CompletableFuture<DataStreamReply> result : results) {
+ if (result.join().isSuccess()) {
+ return;
+ }
}
+
+ sendLeaderFailedReply(results, request, ctx, localReply);
});
}
+ private RaftClientReply getRaftClientReply(DataStreamReply dataStreamReply) {
+ if (dataStreamReply instanceof DataStreamReplyByteBuffer) {
+ try {
+ return ClientProtoUtils.toRaftClientReply(
+ RaftClientReplyProto.parseFrom(((DataStreamReplyByteBuffer)
dataStreamReply).slice()));
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalStateException(this + ": Failed to decode
RaftClientReply");
+ }
+ } else {
+ throw new IllegalStateException(this + ": Unexpected reply type");
+ }
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 1b8d261..33a90a1 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -400,7 +400,7 @@ abstract class DataStreamBaseTest extends BaseTest {
}
- void runTestCloseStream(int bufferSize, int bufferNum, RaftClientReply
expectedClientReply)
+ void runTestMockCluster(int bufferSize, int bufferNum, RaftClientReply
expectedClientReply)
throws IOException {
try (final RaftClient client = newRaftClientForDataStream()) {
final DataStreamOutputImpl out = (DataStreamOutputImpl)
client.getDataStreamApi().stream();
@@ -409,10 +409,16 @@ abstract class DataStreamBaseTest extends BaseTest {
final RaftClientReply clientReply = ClientProtoUtils.toRaftClientReply(
RaftClientReplyProto.parseFrom(replyByteBuffer.slice()));
- Assert.assertTrue(replyByteBuffer.isSuccess());
Assert.assertEquals(clientReply.getCallId(),
expectedClientReply.getCallId());
Assert.assertEquals(clientReply.getClientId(),
expectedClientReply.getClientId());
Assert.assertEquals(clientReply.getLogIndex(),
expectedClientReply.getLogIndex());
+ if (expectedClientReply.getException() != null) {
+ Assert.assertFalse(replyByteBuffer.isSuccess());
+ Assert.assertTrue(clientReply.getException().getMessage().contains(
+ expectedClientReply.getException().getMessage()));
+ } else {
+ Assert.assertTrue(replyByteBuffer.isSuccess());
+ }
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index f9fce8b..7ae9860 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -29,6 +29,8 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.util.NetUtils;
import org.junit.Before;
@@ -78,15 +80,15 @@ public class TestDataStreamNetty extends DataStreamBaseTest
{
runTestDataStream(3);
}
- private void testCloseStream(int leaderIndex, int numServers) throws
Exception {
+ private void testMockCluster(int leaderIndex, int numServers, RaftException
leaderException) throws Exception {
List<RaftServer> raftServers = new ArrayList<>();
ClientId clientId = ClientId.randomId();
RaftGroupId groupId = RaftGroupId.randomId();
long callId = 100;
long longIndex = 200;
final RaftPeer suggestedLeader = RaftPeer.newBuilder().setId("s" +
leaderIndex).build();
- RaftClientReply expectedClientReply = new RaftClientReply(clientId,
suggestedLeader.getId(),
- groupId, callId, true, null, null, longIndex, null);
+ RaftClientReply expectedClientReply = new RaftClientReply(clientId,
suggestedLeader.getId(), groupId, callId,
+ leaderException == null ? true : false, null, leaderException,
longIndex, null);
for (int i = 0; i < numServers; i ++) {
RaftServer raftServer = mock(RaftServer.class);
@@ -114,17 +116,17 @@ public class TestDataStreamNetty extends
DataStreamBaseTest {
raftServers.add(raftServer);
}
- runTestCloseStream(raftServers, 1_000_000, 10, expectedClientReply);
+ runTestMockCluster(raftServers, 1_000_000, 10, expectedClientReply);
}
- void runTestCloseStream(List<RaftServer> raftServers, int bufferSize, int
bufferNum,
+ void runTestMockCluster(List<RaftServer> raftServers, int bufferSize, int
bufferNum,
RaftClientReply expectedClientReply) throws Exception {
try {
final List<RaftPeer> peers = raftServers.stream()
.map(TestDataStreamNetty::newRaftPeer)
.collect(Collectors.toList());
setup(peers, raftServers);
- runTestCloseStream(bufferSize, bufferNum, expectedClientReply);
+ runTestMockCluster(bufferSize, bufferNum, expectedClientReply);
} finally {
shutdown();
}
@@ -133,18 +135,32 @@ public class TestDataStreamNetty extends
DataStreamBaseTest {
@Test
public void testCloseStreamPrimaryIsLeader() throws Exception {
// primary is 0, leader is 0
- testCloseStream(0, 3);
+ testMockCluster(0, 3, null);
}
@Test
public void testCloseStreamPrimaryIsNotLeader() throws Exception {
// primary is 0, leader is 1
- testCloseStream(1, 3);
+ testMockCluster(1, 3, null);
}
@Test
public void testCloseStreamOneServer() throws Exception {
// primary is 0, leader is 0
- testCloseStream(0, 1);
+ testMockCluster(0, 1, null);
+ }
+
+ @Test
+ public void testExceptionInReplyPrimaryIsLeader() throws Exception {
+ // primary is 0, leader is 0
+ StateMachineException stateMachineException = new
StateMachineException("leader throw StateMachineException");
+ testMockCluster(0, 3, stateMachineException);
+ }
+
+ @Test
+ public void testExceptionInReplyPrimaryIsNotLeader() throws Exception {
+ // primary is 0, leader is 1
+ StateMachineException stateMachineException = new
StateMachineException("leader throw StateMachineException");
+ testMockCluster(1, 3, stateMachineException);
}
}