Repository: incubator-ratis Updated Branches: refs/heads/master 1b753aeba -> ccc380119
RATIS-233. Throw an exception for the delayed requests if the leader is stepping down. Contributed by Kit Hui Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ccc38011 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ccc38011 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ccc38011 Branch: refs/heads/master Commit: ccc380119edb31631943b91b1f2f0fa70fc3bf86 Parents: 1b753ae Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Thu Jun 14 17:32:10 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Thu Jun 14 17:32:10 2018 +0800 ---------------------------------------------------------------------- .../ratis/client/impl/ClientProtoUtils.java | 17 ++++++- .../ratis/protocol/NotReplicatedException.java | 46 +++++++++++++++++++ .../apache/ratis/protocol/RaftClientReply.java | 17 +++++-- .../ratis/protocol/RaftClientRequest.java | 3 +- ratis-proto-shaded/src/main/proto/Raft.proto | 9 +++- .../ratis/server/impl/PendingRequest.java | 12 +++-- .../ratis/server/impl/PendingRequests.java | 10 +++++ .../java/org/apache/ratis/RaftAsyncTests.java | 8 ++++ .../java/org/apache/ratis/RaftBasicTests.java | 47 ++++++++++++++++++++ 9 files changed, 158 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 4d858e4..f32d6b9 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -26,6 +26,7 @@ import org.apache.ratis.util.ReflectionUtils; import java.util.Arrays; import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION; +import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION; import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION; public interface ClientProtoUtils { @@ -163,6 +164,15 @@ public interface ClientProtoUtils { .setStacktrace(ProtoUtils.writeObject2ByteString(t.getStackTrace())); b.setStateMachineException(smeBuilder.build()); } + + final NotReplicatedException nre = reply.getNotReplicatedException(); + if (nre != null) { + final NotReplicatedExceptionProto.Builder nreBuilder = NotReplicatedExceptionProto.newBuilder() + .setCallId(nre.getCallId()) + .setReplication(nre.getRequiredReplication()) + .setLogIndex(nre.getLogIndex()); + b.setNotReplicatedException(nreBuilder); + } } return b.build(); } @@ -186,7 +196,7 @@ public interface ClientProtoUtils { static RaftClientReply toRaftClientReply( RaftClientReplyProto replyProto) { final RaftRpcReplyProto rp = replyProto.getRpcReply(); - RaftException e = null; + final RaftException e; if (replyProto.getExceptionDetailsCase().equals(NOTLEADEREXCEPTION)) { NotLeaderExceptionProto nleProto = replyProto.getNotLeaderException(); final RaftPeer suggestedLeader = nleProto.hasSuggestedLeader() ? @@ -195,11 +205,16 @@ public interface ClientProtoUtils { nleProto.getPeersInConfList()); e = new NotLeaderException(RaftPeerId.valueOf(rp.getReplyId()), suggestedLeader, peers); + } else if (replyProto.getExceptionDetailsCase() == NOTREPLICATEDEXCEPTION) { + final NotReplicatedExceptionProto nre = replyProto.getNotReplicatedException(); + e = new NotReplicatedException(nre.getCallId(), nre.getReplication(), nre.getLogIndex()); } else if (replyProto.getExceptionDetailsCase().equals(STATEMACHINEEXCEPTION)) { StateMachineExceptionProto smeProto = replyProto.getStateMachineException(); e = wrapStateMachineException(RaftPeerId.valueOf(rp.getReplyId()), smeProto.getExceptionClassName(), smeProto.getErrorMsg(), smeProto.getStacktrace()); + } else { + e = null; } ClientId clientId = ClientId.valueOf(rp.getRequestorId()); final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java new file mode 100644 index 0000000..67bda34 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; + +public class NotReplicatedException extends RaftException { + private final long callId; + private final ReplicationLevel requiredReplication; + private final long logIndex; + + public NotReplicatedException(long callId, ReplicationLevel requiredReplication, long logIndex) { + super("Request with call Id " + callId + " is committed with log index " + logIndex + + " but not yet replicated to " + requiredReplication); + this.callId = callId; + this.requiredReplication = requiredReplication; + this.logIndex = logIndex; + } + + public long getCallId() { + return callId; + } + + public ReplicationLevel getRequiredReplication() { + return requiredReplication; + } + + public long getLogIndex() { + return logIndex; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java index 8254ab4..ba3cdc7 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java @@ -55,16 +55,15 @@ public class RaftClientReply extends RaftClientMessage { this.callId = callId; this.message = message; this.exception = exception; + this.commitInfos = commitInfos != null? commitInfos: Collections.emptyList(); if (exception != null) { Preconditions.assertTrue(!success, () -> "Inconsistent parameters: success && exception != null: " + this); - Preconditions.assertTrue( - ReflectionUtils.isInstance(exception, NotLeaderException.class, StateMachineException.class), + Preconditions.assertTrue(ReflectionUtils.isInstance(exception, + NotLeaderException.class, NotReplicatedException.class, StateMachineException.class), () -> "Unexpected exception class: " + this); } - - this.commitInfos = commitInfos != null? commitInfos: Collections.emptyList(); } public RaftClientReply(RaftClientRequest request, RaftException exception, Collection<CommitInfoProto> commitInfos) { @@ -81,6 +80,11 @@ public class RaftClientReply extends RaftClientMessage { request.getCallId(), true, message, null, commitInfos); } + public RaftClientReply(RaftClientReply reply, NotReplicatedException nre) { + this(reply.getClientId(), reply.getServerId(), reply.getRaftGroupId(), + reply.getCallId(), false, reply.getMessage(), nre, reply.getCommitInfos()); + } + /** * Get the commit information for the entire group. * The commit information may be unavailable for exception reply. @@ -120,6 +124,11 @@ public class RaftClientReply extends RaftClientMessage { return JavaUtils.cast(exception, NotLeaderException.class); } + /** If this reply has {@link NotReplicatedException}, return it; otherwise return null. */ + public NotReplicatedException getNotReplicatedException() { + return JavaUtils.cast(exception, NotReplicatedException.class); + } + /** If this reply has {@link StateMachineException}, return it; otherwise return null. */ public StateMachineException getStateMachineException() { return JavaUtils.cast(exception, StateMachineException.class); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index 232c51d..34c96f3 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -122,7 +122,8 @@ public class RaftClientRequest extends RaftClientMessage { public String toString() { switch (typeCase) { case WRITE: - return "RW"; + final ReplicationLevel replication = write.getReplication(); + return "RW" + (replication == ReplicationLevel.MAJORITY? "": "-" + replication); case READ: return "RO"; case STALEREAD: http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 3f0baf8..303ec2b 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -203,6 +203,12 @@ message NotLeaderExceptionProto { repeated RaftPeerProto peersInConf = 2; } +message NotReplicatedExceptionProto { + uint64 callId = 1; + ReplicationLevel replication = 2; + uint64 logIndex = 3; +} + message StateMachineExceptionProto { string exceptionClassName = 1; string errorMsg = 2; @@ -215,7 +221,8 @@ message RaftClientReplyProto { oneof ExceptionDetails { NotLeaderExceptionProto notLeaderException = 3; - StateMachineExceptionProto stateMachineException = 4; + NotReplicatedExceptionProto notReplicatedException = 4; + StateMachineExceptionProto stateMachineException = 5; } repeated CommitInfoProto commitInfos = 15; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java index 10cd95f..4a72197 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java @@ -17,10 +17,8 @@ */ package org.apache.ratis.server.impl; -import org.apache.ratis.protocol.NotLeaderException; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.protocol.*; +import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.Preconditions; @@ -86,6 +84,12 @@ public class PendingRequest implements Comparable<PendingRequest> { setReply(delayed); } + synchronized void failDelayedReply() { + final RaftClientRequest.Type type = request.getType(); + final ReplicationLevel replication = type.getWrite().getReplication(); + setReply(new RaftClientReply(delayed, new NotReplicatedException(request.getCallId(), replication, index))); + } + TransactionContext setNotLeaderException(NotLeaderException nle) { setReply(new RaftClientReply(getRequest(), nle, null)); return getEntry(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index c02d7c3..2cf1271 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -116,9 +116,18 @@ class PendingRequests { polled = q.poll(); Preconditions.assertTrue(polled == peeked); } + LOG.debug("{}: complete delay request {}", name, polled); polled.completeDelayedReply(); } } + + void failReplies() { + synchronized (q) { + for(; !q.isEmpty();) { + q.poll().failDelayedReply(); + } + } + } } private PendingRequest pendingSetConf; @@ -215,6 +224,7 @@ class PendingRequests { if (pendingSetConf != null) { pendingSetConf.setNotLeaderException(nle); } + delayedReplies.failReplies(); } void checkDelayedReplies(long allAckedIndex) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index 53e0f1f..b297a27 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.ratis.RaftBasicTests.runTestDelayRequestIfLeaderStepDown; import static org.apache.ratis.RaftTestUtil.waitForLeader; public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends BaseTest @@ -309,4 +310,11 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba //reset for the other tests RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime); } + + @Test + public void testAsyncDelayRequestIfLeaderStepDown() throws Exception { + final CLUSTER cluster = newCluster(5); + cluster.start(); + runTestDelayRequestIfLeaderStepDown(true, cluster, LOG); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 207a458..8c744ab 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -22,6 +22,7 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.impl.RaftClientTestUtil; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.NotReplicatedException; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; @@ -50,6 +51,7 @@ import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -451,4 +453,49 @@ public abstract class RaftBasicTests extends BaseTest { Assert.assertTrue(duration.compareTo(retryCacheExpiryDuration) >= 0); } } + + @Test + public void testDelayRequestIfLeaderStepDown() throws Exception { + runTestDelayRequestIfLeaderStepDown(false, getCluster(), LOG); + } + + static void runTestDelayRequestIfLeaderStepDown(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception { + boolean skipfirstserver = false; + for (RaftServer s : cluster.getServers()) { + if (!skipfirstserver) { + skipfirstserver = true; + cluster.killServer(s.getId()); + continue; + } + cluster.restartServer(s.getId(), false); + } + final RaftServerImpl leader = waitForLeader(cluster); + LOG.info("leader: " + leader.getId() + ", " + cluster.printServers()); + + final SimpleMessage message = SimpleMessage.create(1)[0]; + try (final RaftClient client = cluster.createClientWithLeader()) { + final RaftClientReply reply; + if (async) { + final CompletableFuture<RaftClientReply> f = client.sendAsync(message, ReplicationLevel.ALL); + Thread.sleep(1000); + RaftTestUtil.changeLeader(cluster, leader.getId()); + + reply = f.get(); + } else { + new Thread(() -> { + try { + Thread.sleep(1000); + RaftTestUtil.changeLeader(cluster, leader.getId()); + } catch (Exception e) { + LOG.warn("changeLeader", e); + } + }).start(); + + reply = client.send(message, ReplicationLevel.ALL); + } + throw reply.getNotReplicatedException(); + } catch (NotReplicatedException e) { + LOG.info("Expected", e); + } + } }
