This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f91ef63  RATIS-1143. Return exception of submitClientRequestAsync to 
client (#270)
f91ef63 is described below

commit f91ef63809b21ed65445f4a60d8005ce0a1af8ca
Author: runzhiwang <[email protected]>
AuthorDate: Thu Nov 12 14:00:14 2020 +0800

    RATIS-1143. Return exception of submitClientRequestAsync to client (#270)
---
 .../ratis/netty/server/NettyServerStreamRpc.java   | 67 +++++++++++++++++-----
 .../ratis/datastream/DataStreamBaseTest.java       | 10 +++-
 .../ratis/datastream/TestDataStreamNetty.java      | 34 ++++++++---
 3 files changed, 86 insertions(+), 25 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 14411ab..d7d3f7c 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -26,10 +26,12 @@ import 
org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.io.CloseAsync;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.netty.NettyDataStreamUtils;
+import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
@@ -37,6 +39,7 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.*;
@@ -73,6 +76,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class NettyServerStreamRpc implements DataStreamServerRpc {
   public static final Logger LOG = 
LoggerFactory.getLogger(NettyServerStreamRpc.class);
@@ -155,17 +159,15 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       return out.writeAsync(request.slice().nioBuffer());
     }
 
-    CompletableFuture<Boolean> startTransaction(DataStreamRequestByteBuf 
request, ChannelHandlerContext ctx,
+    CompletableFuture<DataStreamReply> 
startTransaction(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
         Executor executor) {
       return out.startTransactionAsync().thenApplyAsync(reply -> {
         if (reply.isSuccess()) {
           final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
               ((DataStreamReplyByteBuffer)reply).slice(): null;
           sendReplySuccess(request, buffer, -1, ctx);
-          return true;
-        } else {
-          return false;
         }
+        return reply;
       }, executor);
     }
 
@@ -347,9 +349,9 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     }
   }
 
-  private void sendReplyNotSuccess(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx) {
+  private void sendReplyNotSuccess(DataStreamRequestByteBuf request, 
ByteBuffer buffer, ChannelHandlerContext ctx) {
     final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), null, -1, false, 
request.getType());
+        request.getStreamId(), request.getStreamOffset(), buffer, -1, false, 
request.getType());
     ctx.writeAndFlush(reply);
   }
 
@@ -363,7 +365,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   private void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
       DataStreamRequestByteBuf request, long bytesWritten, 
ChannelHandlerContext ctx) {
       if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
-        sendReplyNotSuccess(request, ctx);
+        sendReplyNotSuccess(request, null, ctx);
       } else {
         sendReplySuccess(request, null, bytesWritten, ctx);
       }
@@ -388,31 +390,68 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
         } else if (request.getType() == Type.STREAM_CLOSE) {
           // if this server is not the leader, forward start transition to the 
other peers
           // there maybe other unexpected reason cause failure except not 
leader, forwardStartTransaction anyway
-          forwardStartTransaction(info, request, ctx);
+          forwardStartTransaction(info, request, ctx, reply);
         } else if (request.getType() == Type.START_TRANSACTION){
-          sendReplyNotSuccess(request, ctx);
+          ByteBuffer buffer = 
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
         } else {
           throw new IllegalStateException(this + ": Unexpected type " + 
request.getType() + ", request=" + request);
         }
       }, executor);
     } catch (IOException e) {
-      sendReplyNotSuccess(request, ctx);
+      sendReplyNotSuccess(request, null, ctx);
       return CompletableFuture.completedFuture(null);
     }
   }
 
+  private void sendLeaderFailedReply(final 
List<CompletableFuture<DataStreamReply>> results,
+      final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx, 
RaftClientReply localReply) {
+    // get replies from the results, ignored exceptional replies
+    final Stream<RaftClientReply> remoteReplies = results.stream()
+        .filter(r -> !r.isCompletedExceptionally())
+        .map(CompletableFuture::join)
+        .map(this::getRaftClientReply);
+
+    // choose the leader's reply if there is any.  Otherwise, use the local 
reply
+    final RaftClientReply chosen = Stream.concat(Stream.of(localReply), 
remoteReplies)
+        .filter(reply -> reply.getNotLeaderException() == null)
+        .findAny().orElse(localReply);
+
+    // send reply
+    final ByteBuffer buffer = 
ClientProtoUtils.toRaftClientReplyProto(chosen).toByteString().asReadOnlyByteBuffer();
+    sendReplyNotSuccess(request, buffer, ctx);
+  }
+
   private void forwardStartTransaction(
-      final StreamInfo info, final DataStreamRequestByteBuf request, final 
ChannelHandlerContext ctx) {
-    final List<CompletableFuture<Boolean>> results = info.applyToRemotes(
+      final StreamInfo info, final DataStreamRequestByteBuf request,
+      final ChannelHandlerContext ctx, RaftClientReply localReply) {
+    final List<CompletableFuture<DataStreamReply>> results = 
info.applyToRemotes(
         out -> out.startTransaction(request, ctx, executor));
 
     JavaUtils.allOf(results).thenAccept(v -> {
-      if (!results.stream().map(CompletableFuture::join).reduce(false, 
Boolean::logicalOr)) {
-        sendReplyNotSuccess(request, ctx);
+      for (CompletableFuture<DataStreamReply> result : results) {
+        if (result.join().isSuccess()) {
+          return;
+        }
       }
+
+      sendLeaderFailedReply(results, request, ctx, localReply);
     });
   }
 
+  private RaftClientReply getRaftClientReply(DataStreamReply dataStreamReply) {
+    if (dataStreamReply instanceof DataStreamReplyByteBuffer) {
+      try {
+        return ClientProtoUtils.toRaftClientReply(
+            RaftClientReplyProto.parseFrom(((DataStreamReplyByteBuffer) 
dataStreamReply).slice()));
+      } catch (InvalidProtocolBufferException e) {
+        throw new IllegalStateException(this + ": Failed to decode 
RaftClientReply");
+      }
+    } else {
+      throw new IllegalStateException(this + ": Unexpected reply type");
+    }
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf 
request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 1b8d261..33a90a1 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -400,7 +400,7 @@ abstract class DataStreamBaseTest extends BaseTest {
   }
 
 
-  void runTestCloseStream(int bufferSize, int bufferNum, RaftClientReply 
expectedClientReply)
+  void runTestMockCluster(int bufferSize, int bufferNum, RaftClientReply 
expectedClientReply)
       throws IOException {
     try (final RaftClient client = newRaftClientForDataStream()) {
       final DataStreamOutputImpl out = (DataStreamOutputImpl) 
client.getDataStreamApi().stream();
@@ -409,10 +409,16 @@ abstract class DataStreamBaseTest extends BaseTest {
 
       final RaftClientReply clientReply = ClientProtoUtils.toRaftClientReply(
           RaftClientReplyProto.parseFrom(replyByteBuffer.slice()));
-      Assert.assertTrue(replyByteBuffer.isSuccess());
       Assert.assertEquals(clientReply.getCallId(), 
expectedClientReply.getCallId());
       Assert.assertEquals(clientReply.getClientId(), 
expectedClientReply.getClientId());
       Assert.assertEquals(clientReply.getLogIndex(), 
expectedClientReply.getLogIndex());
+      if (expectedClientReply.getException() != null) {
+        Assert.assertFalse(replyByteBuffer.isSuccess());
+        Assert.assertTrue(clientReply.getException().getMessage().contains(
+            expectedClientReply.getException().getMessage()));
+      } else {
+        Assert.assertTrue(replyByteBuffer.isSuccess());
+      }
     }
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index f9fce8b..7ae9860 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -29,6 +29,8 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.util.NetUtils;
 import org.junit.Before;
@@ -78,15 +80,15 @@ public class TestDataStreamNetty extends DataStreamBaseTest 
{
     runTestDataStream(3);
   }
 
-  private void testCloseStream(int leaderIndex, int numServers) throws 
Exception {
+  private void testMockCluster(int leaderIndex, int numServers, RaftException 
leaderException) throws Exception {
     List<RaftServer> raftServers = new ArrayList<>();
     ClientId clientId = ClientId.randomId();
     RaftGroupId groupId = RaftGroupId.randomId();
     long callId = 100;
     long longIndex = 200;
     final RaftPeer suggestedLeader = RaftPeer.newBuilder().setId("s" + 
leaderIndex).build();
-    RaftClientReply expectedClientReply = new RaftClientReply(clientId, 
suggestedLeader.getId(),
-        groupId, callId, true, null, null, longIndex, null);
+    RaftClientReply expectedClientReply = new RaftClientReply(clientId, 
suggestedLeader.getId(), groupId, callId,
+        leaderException == null ? true : false, null, leaderException, 
longIndex, null);
 
     for (int i = 0; i < numServers; i ++) {
       RaftServer raftServer = mock(RaftServer.class);
@@ -114,17 +116,17 @@ public class TestDataStreamNetty extends 
DataStreamBaseTest {
       raftServers.add(raftServer);
     }
 
-    runTestCloseStream(raftServers, 1_000_000, 10, expectedClientReply);
+    runTestMockCluster(raftServers, 1_000_000, 10, expectedClientReply);
   }
 
-  void runTestCloseStream(List<RaftServer> raftServers, int bufferSize, int 
bufferNum,
+  void runTestMockCluster(List<RaftServer> raftServers, int bufferSize, int 
bufferNum,
       RaftClientReply expectedClientReply) throws Exception {
     try {
       final List<RaftPeer> peers = raftServers.stream()
           .map(TestDataStreamNetty::newRaftPeer)
           .collect(Collectors.toList());
       setup(peers, raftServers);
-      runTestCloseStream(bufferSize, bufferNum, expectedClientReply);
+      runTestMockCluster(bufferSize, bufferNum, expectedClientReply);
     } finally {
       shutdown();
     }
@@ -133,18 +135,32 @@ public class TestDataStreamNetty extends 
DataStreamBaseTest {
   @Test
   public void testCloseStreamPrimaryIsLeader() throws Exception {
     // primary is 0, leader is 0
-    testCloseStream(0, 3);
+    testMockCluster(0, 3, null);
   }
 
   @Test
   public void testCloseStreamPrimaryIsNotLeader() throws Exception {
     // primary is 0, leader is 1
-    testCloseStream(1, 3);
+    testMockCluster(1, 3, null);
   }
 
   @Test
   public void testCloseStreamOneServer() throws Exception {
     // primary is 0, leader is 0
-    testCloseStream(0, 1);
+    testMockCluster(0, 1, null);
+  }
+
+  @Test
+  public void testExceptionInReplyPrimaryIsLeader() throws Exception {
+    // primary is 0, leader is 0
+    StateMachineException stateMachineException = new 
StateMachineException("leader throw StateMachineException");
+    testMockCluster(0, 3, stateMachineException);
+  }
+
+  @Test
+  public void testExceptionInReplyPrimaryIsNotLeader() throws Exception {
+    // primary is 0, leader is 1
+    StateMachineException stateMachineException = new 
StateMachineException("leader throw StateMachineException");
+    testMockCluster(1, 3, stateMachineException);
   }
 }

Reply via email to