This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f778e2  RATIS-592. One node ratis writes fail forever after first 
NotLeaderException or LeaderNotReadyException.  Contributed by Siddharth Wagle
5f778e2 is described below

commit 5f778e206fcb31aa292393519e51139e59240c68
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Jun 28 16:32:53 2019 +0800

    RATIS-592. One node ratis writes fail forever after first 
NotLeaderException or LeaderNotReadyException.  Contributed by Siddharth Wagle
---
 .../java/org/apache/ratis/client/impl/ClientProtoUtils.java | 12 ++++++++++++
 .../java/org/apache/ratis/client/impl/OrderedAsync.java     |  2 +-
 .../java/org/apache/ratis/client/impl/RaftClientImpl.java   | 13 +++++++------
 .../java/org/apache/ratis/client/impl/UnorderedAsync.java   |  2 +-
 .../org/apache/ratis/protocol/LeaderNotReadyException.java  |  9 ++++++---
 .../java/org/apache/ratis/protocol/RaftClientReply.java     |  7 ++++++-
 .../apache/ratis/grpc/client/GrpcClientProtocolClient.java  |  6 ++++++
 ratis-proto/src/main/proto/Raft.proto                       |  5 +++++
 .../java/org/apache/ratis/server/impl/RaftServerImpl.java   |  4 +++-
 .../ratis/server/impl/RaftReconfigurationBaseTest.java      |  9 ++++-----
 10 files changed, 51 insertions(+), 18 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index d7aca72..38977bc 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -26,6 +26,8 @@ import org.apache.ratis.util.ReflectionUtils;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto
+    .ExceptionDetailsCase.LEADERNOTREADYEXCEPTION;
 import static 
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
 import static 
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION;
 import static 
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
@@ -185,6 +187,13 @@ public interface ClientProtoUtils {
             .setLogIndex(nre.getLogIndex());
         b.setNotReplicatedException(nreBuilder);
       }
+
+      final LeaderNotReadyException lnre = reply.getLeaderNotReadyException();
+      if (lnre != null) {
+        LeaderNotReadyExceptionProto.Builder lnreBuilder = 
LeaderNotReadyExceptionProto.newBuilder()
+            .setRaftPeerId(lnre.getRaftPeerId().toByteString());
+        b.setLeaderNotReadyException(lnreBuilder);
+      }
     }
     return b.build();
   }
@@ -239,6 +248,9 @@ public interface ClientProtoUtils {
       e = wrapStateMachineException(RaftPeerId.valueOf(rp.getReplyId()),
           smeProto.getExceptionClassName(), smeProto.getErrorMsg(),
           smeProto.getStacktrace());
+    } else if 
(replyProto.getExceptionDetailsCase().equals(LEADERNOTREADYEXCEPTION)) {
+      LeaderNotReadyExceptionProto lnreProto = 
replyProto.getLeaderNotReadyException();
+      e = new 
LeaderNotReadyException(RaftPeerId.valueOf(lnreProto.getRaftPeerId()));
     } else {
       e = null;
     }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 35681b8..efd26a1 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -210,7 +210,7 @@ class OrderedAsync {
     return f.thenApply(reply -> {
       LOG.debug("{}: receive* {}", client.getId(), reply);
       final RaftException replyException = reply != null? 
reply.getException(): null;
-      reply = client.handleNotLeaderException(request, reply, 
this::resetSlidingWindow);
+      reply = client.handleLeaderException(request, reply, 
this::resetSlidingWindow);
       if (reply != null) {
         getSlidingWindow(request).receiveReply(
             request.getSlidingWindowEntry().getSeqNum(), reply, 
this::sendRequestWithRetry);
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9798287..7a93cab 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -284,7 +284,7 @@ final class RaftClientImpl implements RaftClient {
       throw ioe;
     }
     LOG.debug("{}: receive {}", clientId, reply);
-    reply = handleNotLeaderException(request, reply, null);
+    reply = handleLeaderException(request, reply, null);
     reply = handleRaftException(reply, Function.identity());
     return reply;
   }
@@ -301,12 +301,13 @@ final class RaftClientImpl implements RaftClient {
   }
 
   /**
-   * @return null if the reply is null or it has {@link NotLeaderException};
-   *         otherwise return the same reply.
+   * @return null if the reply is null or it has
+   * {@link NotLeaderException} or {@link LeaderNotReadyException}
+   * otherwise return the same reply.
    */
-  RaftClientReply handleNotLeaderException(RaftClientRequest request, 
RaftClientReply reply,
-      Consumer<RaftClientRequest> handler) {
-    if (reply == null) {
+  RaftClientReply handleLeaderException(RaftClientRequest request, 
RaftClientReply reply,
+                                        Consumer<RaftClientRequest> handler) {
+    if (reply == null || reply.getException() instanceof 
LeaderNotReadyException) {
       return null;
     }
     final NotLeaderException nle = reply.getNotLeaderException();
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index b99e950..8dee190 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -76,7 +76,7 @@ public interface UnorderedAsync {
       try {
         LOG.debug("{}: attempt #{} receive~ {}", clientId, attemptCount, 
reply);
         final RaftException replyException = reply != null? 
reply.getException(): null;
-        reply = client.handleNotLeaderException(request, reply, null);
+        reply = client.handleLeaderException(request, reply, null);
         if (reply != null) {
           f.complete(reply);
           return;
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
index 55af3f4..2b81958 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
@@ -24,11 +24,14 @@ package org.apache.ratis.protocol;
  * it cannot determine whether a request is just a retry.
  */
 public class LeaderNotReadyException extends RaftException {
+  private final RaftPeerId raftPeerId;
+
   public LeaderNotReadyException(RaftPeerId id) {
-    this(id + " is in LEADER state but not ready yet.");
+    super(id + " is in LEADER state but not ready yet.");
+    this.raftPeerId = id;
   }
 
-  public LeaderNotReadyException(String msg) {
-    super(msg);
+  public RaftPeerId getRaftPeerId() {
+    return raftPeerId;
   }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index f06a0e5..33bbcd0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -68,7 +68,8 @@ public class RaftClientReply extends RaftClientMessage {
           () -> "Inconsistent parameters: success && exception != null: " + 
this);
       Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
           AlreadyClosedException.class,
-          NotLeaderException.class, NotReplicatedException.class, 
StateMachineException.class),
+          NotLeaderException.class, NotReplicatedException.class,
+          LeaderNotReadyException.class, StateMachineException.class),
           () -> "Unexpected exception class: " + this);
     }
   }
@@ -151,6 +152,10 @@ public class RaftClientReply extends RaftClientMessage {
     return JavaUtils.cast(exception, StateMachineException.class);
   }
 
+  public LeaderNotReadyException getLeaderNotReadyException() {
+    return JavaUtils.cast(exception, LeaderNotReadyException.class);
+  }
+
   /** @return the exception, if there is any; otherwise, return null. */
   public RaftException getException() {
     return exception;
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index e6a2c54..5a49cec 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -31,6 +31,7 @@ import 
org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.protocol.LeaderNotReadyException;
 import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
@@ -257,6 +258,11 @@ public class GrpcClientProtocolClient implements Closeable 
{
             completeReplyExceptionally(nle, 
NotLeaderException.class.getName());
             return;
           }
+          final LeaderNotReadyException lnre = 
reply.getLeaderNotReadyException();
+          if (lnre != null) {
+            completeReplyExceptionally(lnre, 
NotLeaderException.class.getName());
+            return;
+          }
           handleReplyFuture(callId, f -> f.complete(reply));
         } catch (Throwable t) {
           handleReplyFuture(callId, f -> f.completeExceptionally(t));
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index 2d24a50..45b7851 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -269,6 +269,10 @@ message NotLeaderExceptionProto {
   repeated RaftPeerProto peersInConf = 2;
 }
 
+message LeaderNotReadyExceptionProto {
+  bytes raftPeerId = 1; // id of the peer
+}
+
 message NotReplicatedExceptionProto {
   uint64 callId = 1;
   ReplicationLevel replication = 2;
@@ -289,6 +293,7 @@ message RaftClientReplyProto {
     NotLeaderExceptionProto notLeaderException = 3;
     NotReplicatedExceptionProto notReplicatedException = 4;
     StateMachineExceptionProto stateMachineException = 5;
+    LeaderNotReadyExceptionProto leaderNotReadyException = 6;
   }
 
   uint64 logIndex = 14; // When the request is a write request and the reply 
is success, the log index of the transaction
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index ec21a17..d7843d5 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -439,7 +439,9 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       if (cacheEntry != null && cacheEntry.isCompletedNormally()) {
         return cacheEntry.getReplyFuture();
       }
-      return RetryCache.failWithException(new 
LeaderNotReadyException(getId()), entry);
+      final RaftClientReply reply = new RaftClientReply(request,
+          new LeaderNotReadyException(getId()), getCommitInfos());
+      return RetryCache.failWithReply(reply, entry);
     }
     return null;
   }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index ac022bf..4f456fa 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -575,16 +575,15 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER 
extends MiniRaftCluste
       new Thread(() -> {
         final RaftClient client = cluster.createClient(leaderId);
         final RaftClientRpc sender = client.getClientRpc();
-
         final RaftClientRequest request = cluster.newRaftClientRequest(
             client.getId(), leaderId, new SimpleMessage("test"));
         while (!success.get()) {
           try {
-            RaftClientReply reply = sender.sendRequest(request);
+            final RaftClientReply reply = sender.sendRequest(request);
             success.set(reply.isSuccess());
-          } catch (LeaderNotReadyException e) {
-            LOG.info("Hit LeaderNotReadyException", e);
-            caughtNotReady.set(true);
+            if (reply.getException() != null && reply.getException() 
instanceof LeaderNotReadyException) {
+              caughtNotReady.set(true);
+            }
           } catch (IOException e) {
             LOG.info("Hit other IOException", e);
           }

Reply via email to