This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 5f778e2 RATIS-592. One node ratis writes fail forever after first
NotLeaderException or LeaderNotReadyException. Contributed by Siddharth Wagle
5f778e2 is described below
commit 5f778e206fcb31aa292393519e51139e59240c68
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Jun 28 16:32:53 2019 +0800
RATIS-592. One node ratis writes fail forever after first
NotLeaderException or LeaderNotReadyException. Contributed by Siddharth Wagle
---
.../java/org/apache/ratis/client/impl/ClientProtoUtils.java | 12 ++++++++++++
.../java/org/apache/ratis/client/impl/OrderedAsync.java | 2 +-
.../java/org/apache/ratis/client/impl/RaftClientImpl.java | 13 +++++++------
.../java/org/apache/ratis/client/impl/UnorderedAsync.java | 2 +-
.../org/apache/ratis/protocol/LeaderNotReadyException.java | 9 ++++++---
.../java/org/apache/ratis/protocol/RaftClientReply.java | 7 ++++++-
.../apache/ratis/grpc/client/GrpcClientProtocolClient.java | 6 ++++++
ratis-proto/src/main/proto/Raft.proto | 5 +++++
.../java/org/apache/ratis/server/impl/RaftServerImpl.java | 4 +++-
.../ratis/server/impl/RaftReconfigurationBaseTest.java | 9 ++++-----
10 files changed, 51 insertions(+), 18 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index d7aca72..38977bc 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -26,6 +26,8 @@ import org.apache.ratis.util.ReflectionUtils;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto
+ .ExceptionDetailsCase.LEADERNOTREADYEXCEPTION;
import static
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
import static
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION;
import static
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
@@ -185,6 +187,13 @@ public interface ClientProtoUtils {
.setLogIndex(nre.getLogIndex());
b.setNotReplicatedException(nreBuilder);
}
+
+ final LeaderNotReadyException lnre = reply.getLeaderNotReadyException();
+ if (lnre != null) {
+ LeaderNotReadyExceptionProto.Builder lnreBuilder =
LeaderNotReadyExceptionProto.newBuilder()
+ .setRaftPeerId(lnre.getRaftPeerId().toByteString());
+ b.setLeaderNotReadyException(lnreBuilder);
+ }
}
return b.build();
}
@@ -239,6 +248,9 @@ public interface ClientProtoUtils {
e = wrapStateMachineException(RaftPeerId.valueOf(rp.getReplyId()),
smeProto.getExceptionClassName(), smeProto.getErrorMsg(),
smeProto.getStacktrace());
+ } else if
(replyProto.getExceptionDetailsCase().equals(LEADERNOTREADYEXCEPTION)) {
+ LeaderNotReadyExceptionProto lnreProto =
replyProto.getLeaderNotReadyException();
+ e = new
LeaderNotReadyException(RaftPeerId.valueOf(lnreProto.getRaftPeerId()));
} else {
e = null;
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 35681b8..efd26a1 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -210,7 +210,7 @@ class OrderedAsync {
return f.thenApply(reply -> {
LOG.debug("{}: receive* {}", client.getId(), reply);
final RaftException replyException = reply != null?
reply.getException(): null;
- reply = client.handleNotLeaderException(request, reply,
this::resetSlidingWindow);
+ reply = client.handleLeaderException(request, reply,
this::resetSlidingWindow);
if (reply != null) {
getSlidingWindow(request).receiveReply(
request.getSlidingWindowEntry().getSeqNum(), reply,
this::sendRequestWithRetry);
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9798287..7a93cab 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -284,7 +284,7 @@ final class RaftClientImpl implements RaftClient {
throw ioe;
}
LOG.debug("{}: receive {}", clientId, reply);
- reply = handleNotLeaderException(request, reply, null);
+ reply = handleLeaderException(request, reply, null);
reply = handleRaftException(reply, Function.identity());
return reply;
}
@@ -301,12 +301,13 @@ final class RaftClientImpl implements RaftClient {
}
/**
- * @return null if the reply is null or it has {@link NotLeaderException};
- * otherwise return the same reply.
+ * @return null if the reply is null or it has
+ * {@link NotLeaderException} or {@link LeaderNotReadyException}
+ * otherwise return the same reply.
*/
- RaftClientReply handleNotLeaderException(RaftClientRequest request,
RaftClientReply reply,
- Consumer<RaftClientRequest> handler) {
- if (reply == null) {
+ RaftClientReply handleLeaderException(RaftClientRequest request,
RaftClientReply reply,
+ Consumer<RaftClientRequest> handler) {
+ if (reply == null || reply.getException() instanceof
LeaderNotReadyException) {
return null;
}
final NotLeaderException nle = reply.getNotLeaderException();
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index b99e950..8dee190 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -76,7 +76,7 @@ public interface UnorderedAsync {
try {
LOG.debug("{}: attempt #{} receive~ {}", clientId, attemptCount,
reply);
final RaftException replyException = reply != null?
reply.getException(): null;
- reply = client.handleNotLeaderException(request, reply, null);
+ reply = client.handleLeaderException(request, reply, null);
if (reply != null) {
f.complete(reply);
return;
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
index 55af3f4..2b81958 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
@@ -24,11 +24,14 @@ package org.apache.ratis.protocol;
* it cannot determine whether a request is just a retry.
*/
public class LeaderNotReadyException extends RaftException {
+ private final RaftPeerId raftPeerId;
+
public LeaderNotReadyException(RaftPeerId id) {
- this(id + " is in LEADER state but not ready yet.");
+ super(id + " is in LEADER state but not ready yet.");
+ this.raftPeerId = id;
}
- public LeaderNotReadyException(String msg) {
- super(msg);
+ public RaftPeerId getRaftPeerId() {
+ return raftPeerId;
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index f06a0e5..33bbcd0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -68,7 +68,8 @@ public class RaftClientReply extends RaftClientMessage {
() -> "Inconsistent parameters: success && exception != null: " +
this);
Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
AlreadyClosedException.class,
- NotLeaderException.class, NotReplicatedException.class,
StateMachineException.class),
+ NotLeaderException.class, NotReplicatedException.class,
+ LeaderNotReadyException.class, StateMachineException.class),
() -> "Unexpected exception class: " + this);
}
}
@@ -151,6 +152,10 @@ public class RaftClientReply extends RaftClientMessage {
return JavaUtils.cast(exception, StateMachineException.class);
}
+ public LeaderNotReadyException getLeaderNotReadyException() {
+ return JavaUtils.cast(exception, LeaderNotReadyException.class);
+ }
+
/** @return the exception, if there is any; otherwise, return null. */
public RaftException getException() {
return exception;
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index e6a2c54..5a49cec 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -31,6 +31,7 @@ import
org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.protocol.LeaderNotReadyException;
import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
@@ -257,6 +258,11 @@ public class GrpcClientProtocolClient implements Closeable
{
completeReplyExceptionally(nle,
NotLeaderException.class.getName());
return;
}
+ final LeaderNotReadyException lnre =
reply.getLeaderNotReadyException();
+ if (lnre != null) {
+ completeReplyExceptionally(lnre,
NotLeaderException.class.getName());
+ return;
+ }
handleReplyFuture(callId, f -> f.complete(reply));
} catch (Throwable t) {
handleReplyFuture(callId, f -> f.completeExceptionally(t));
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index 2d24a50..45b7851 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -269,6 +269,10 @@ message NotLeaderExceptionProto {
repeated RaftPeerProto peersInConf = 2;
}
+message LeaderNotReadyExceptionProto {
+ bytes raftPeerId = 1; // id of the peer
+}
+
message NotReplicatedExceptionProto {
uint64 callId = 1;
ReplicationLevel replication = 2;
@@ -289,6 +293,7 @@ message RaftClientReplyProto {
NotLeaderExceptionProto notLeaderException = 3;
NotReplicatedExceptionProto notReplicatedException = 4;
StateMachineExceptionProto stateMachineException = 5;
+ LeaderNotReadyExceptionProto leaderNotReadyException = 6;
}
uint64 logIndex = 14; // When the request is a write request and the reply
is success, the log index of the transaction
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index ec21a17..d7843d5 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -439,7 +439,9 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
if (cacheEntry != null && cacheEntry.isCompletedNormally()) {
return cacheEntry.getReplyFuture();
}
- return RetryCache.failWithException(new
LeaderNotReadyException(getId()), entry);
+ final RaftClientReply reply = new RaftClientReply(request,
+ new LeaderNotReadyException(getId()), getCommitInfos());
+ return RetryCache.failWithReply(reply, entry);
}
return null;
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index ac022bf..4f456fa 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -575,16 +575,15 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
new Thread(() -> {
final RaftClient client = cluster.createClient(leaderId);
final RaftClientRpc sender = client.getClientRpc();
-
final RaftClientRequest request = cluster.newRaftClientRequest(
client.getId(), leaderId, new SimpleMessage("test"));
while (!success.get()) {
try {
- RaftClientReply reply = sender.sendRequest(request);
+ final RaftClientReply reply = sender.sendRequest(request);
success.set(reply.isSuccess());
- } catch (LeaderNotReadyException e) {
- LOG.info("Hit LeaderNotReadyException", e);
- caughtNotReady.set(true);
+ if (reply.getException() != null && reply.getException()
instanceof LeaderNotReadyException) {
+ caughtNotReady.set(true);
+ }
} catch (IOException e) {
LOG.info("Hit other IOException", e);
}