szetszwo commented on a change in pull request #270:
URL: https://github.com/apache/incubator-ratis/pull/270#discussion_r521440946



##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -390,31 +391,82 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) {
         } else if (request.getType() == Type.STREAM_CLOSE) {
           // if this server is not the leader, forward start transition to the 
other peers
           // there maybe other unexpected reason cause failure except not 
leader, forwardStartTransaction anyway
-          forwardStartTransaction(info, request, ctx);
+          forwardStartTransaction(info, request, ctx, reply);
         } else if (request.getType() == Type.START_TRANSACTION){
-          sendReplyNotSuccess(request, ctx);
+          ByteBuffer buffer = 
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
         } else {
           throw new IllegalStateException(this + ": Unexpected type " + 
request.getType() + ", request=" + request);
         }
       }, executor);
     } catch (IOException e) {
-      sendReplyNotSuccess(request, ctx);
+      sendReplyNotSuccess(request, null, ctx);
       return CompletableFuture.completedFuture(null);
     }
   }
 
+  private void sendLeaderFailedReply(
+      final List<RaftClientReply> replies, final DataStreamRequestByteBuf 
request, final ChannelHandlerContext ctx) {
+    RaftPeer suggestedLeader = null;
+    for (RaftClientReply reply : replies) {
+      if (reply.getNotLeaderException() != null && 
reply.getNotLeaderException().getSuggestedLeader() != null) {
+        suggestedLeader = reply.getNotLeaderException().getSuggestedLeader();
+        break;
+      }
+    }
+
+    if (suggestedLeader == null) {
+      sendReplyNotSuccess(request, null, ctx);
+    } else {
+      for (RaftClientReply reply : replies) {
+        if (reply.getServerId().equals(suggestedLeader.getId())) {
+          ByteBuffer buffer = 
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
+          return;
+        }
+      }
+
+      throw new IllegalStateException(this + ": Failed to find 
suggestedLeader:" + suggestedLeader.getId());
+    }
+  }
+
   private void forwardStartTransaction(
-      final StreamInfo info, final DataStreamRequestByteBuf request, final 
ChannelHandlerContext ctx) {
-    final List<CompletableFuture<Boolean>> results = info.applyToRemotes(
+      final StreamInfo info, final DataStreamRequestByteBuf request,
+      final ChannelHandlerContext ctx, RaftClientReply reply) {
+    final List<CompletableFuture<DataStreamReply>> results = 
info.applyToRemotes(
         out -> out.startTransaction(request, ctx, executor));
 
     JavaUtils.allOf(results).thenAccept(v -> {
-      if (!results.stream().map(CompletableFuture::join).reduce(false, 
Boolean::logicalOr)) {
-        sendReplyNotSuccess(request, ctx);
+      for (CompletableFuture<DataStreamReply> result : results) {
+        if (result.join().isSuccess()) {
+          return;
+        }
       }
+
+      List<RaftClientReply> replies = new ArrayList<>();
+      replies.add(reply);
+
+      for (CompletableFuture<DataStreamReply> result : results) {
+        replies.add(getRaftClientReply(result.join()));
+      }

Review comment:
       Move them to sendLeaderFailedReply.

##########
File path: 
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -390,31 +391,82 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) {
         } else if (request.getType() == Type.STREAM_CLOSE) {
           // if this server is not the leader, forward start transition to the 
other peers
           // there maybe other unexpected reason cause failure except not 
leader, forwardStartTransaction anyway
-          forwardStartTransaction(info, request, ctx);
+          forwardStartTransaction(info, request, ctx, reply);
         } else if (request.getType() == Type.START_TRANSACTION){
-          sendReplyNotSuccess(request, ctx);
+          ByteBuffer buffer = 
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
         } else {
           throw new IllegalStateException(this + ": Unexpected type " + 
request.getType() + ", request=" + request);
         }
       }, executor);
     } catch (IOException e) {
-      sendReplyNotSuccess(request, ctx);
+      sendReplyNotSuccess(request, null, ctx);
       return CompletableFuture.completedFuture(null);
     }
   }
 
+  private void sendLeaderFailedReply(
+      final List<RaftClientReply> replies, final DataStreamRequestByteBuf 
request, final ChannelHandlerContext ctx) {
+    RaftPeer suggestedLeader = null;
+    for (RaftClientReply reply : replies) {
+      if (reply.getNotLeaderException() != null && 
reply.getNotLeaderException().getSuggestedLeader() != null) {
+        suggestedLeader = reply.getNotLeaderException().getSuggestedLeader();
+        break;
+      }
+    }
+
+    if (suggestedLeader == null) {
+      sendReplyNotSuccess(request, null, ctx);
+    } else {
+      for (RaftClientReply reply : replies) {
+        if (reply.getServerId().equals(suggestedLeader.getId())) {
+          ByteBuffer buffer = 
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
+          return;
+        }
+      }
+
+      throw new IllegalStateException(this + ": Failed to find 
suggestedLeader:" + suggestedLeader.getId());
+    }
+  }

Review comment:
       The suggestedLeader is unreliable since a stale server may suggest an 
old leader.  We may use getNotLeaderException() to find out the leader reply.
   
   Also, the exceptional results should be removed.
   ```
     private void sendLeaderFailedReply(final 
List<CompletableFuture<DataStreamReply>> results,
         final DataStreamRequestByteBuf request, final ChannelHandlerContext 
ctx, RaftClientReply localReply) {
       // get replies from the results, ignored exceptional replies
       final Stream<RaftClientReply> remoteReplies = results.stream()
           .filter(r -> !r.isCompletedExceptionally())
           .map(CompletableFuture::join)
           .map(this::getRaftClientReply);
   
       // choose the leader's reply if there is any.  Otherwise, use the local 
reply
       final RaftClientReply chosen = Stream.concat(Stream.of(localReply), 
remoteReplies)
           .filter(reply -> reply.getNotLeaderException() != null)
           .findAny().orElse(localReply);
   
       // send reply
       final ByteBuffer buffer = 
ClientProtoUtils.toRaftClientReplyProto(chosen).toByteString().asReadOnlyByteBuffer();
       sendReplyNotSuccess(request, buffer, ctx);
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to