Repository: incubator-ratis Updated Branches: refs/heads/master e4a016fb0 -> 86db875aa
RATIS-270. Replication ALL requests should not be replied from retry cache if they are delayed. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/86db875a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/86db875a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/86db875a Branch: refs/heads/master Commit: 86db875aa622fb5432c6048894eafc54c236a7c1 Parents: e4a016f Author: Mukul Kumar Singh <[email protected]> Authored: Sat Aug 11 17:50:37 2018 +0530 Committer: Mukul Kumar Singh <[email protected]> Committed: Sat Aug 11 17:50:37 2018 +0530 ---------------------------------------------------------------------- .../ratis/grpc/TestRetryCacheWithGrpc.java | 71 +++++++++++++++++++- .../apache/ratis/server/impl/LeaderState.java | 7 +- .../ratis/server/impl/PendingRequest.java | 39 ++++++++--- .../ratis/server/impl/PendingRequests.java | 11 +-- .../ratis/server/impl/RaftServerImpl.java | 11 ++- .../java/org/apache/ratis/MiniRaftCluster.java | 13 +++- .../java/org/apache/ratis/RetryCacheTests.java | 38 +++++------ 7 files changed, 149 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java index 956fd66..f577a48 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java @@ -18,12 +18,26 @@ package org.apache.ratis.grpc; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.log4j.Level; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RetryCacheTests; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.util.LogUtils; import org.junit.Assert; +import org.junit.Test; public class TestRetryCacheWithGrpc extends RetryCacheTests { static { @@ -43,4 +57,59 @@ public class TestRetryCacheWithGrpc extends RetryCacheTests { return cluster; } -} + @Test + public void testAsyncRetryWithReplicatedAll() throws Exception { + final MiniRaftCluster cluster = getCluster(); + RaftTestUtil.waitForLeader(cluster); + + final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage().getId(); + long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex(); + + // Kill a follower + final RaftPeerId killedFollower = cluster.getFollowers().get(0).getId(); + cluster.killServer(killedFollower); + + final long callId = 999; + final long seqNum = 111; + final ClientId clientId = ClientId.randomId(); + + // Retry with the same clientId and callId + final List<CompletableFuture<RaftClient>> futures = new ArrayList<>(); + futures.addAll(sendRetry(clientId, leaderId, callId, seqNum, cluster)); + futures.addAll(sendRetry(clientId, leaderId, callId, seqNum, cluster)); + + // restart the killed follower + cluster.restartServer(killedFollower, false); + for(CompletableFuture<RaftClient> f : futures) { + f.join().close(); + } + assertServer(cluster, clientId, callId, oldLastApplied); + } + + List<CompletableFuture<RaftClient>> sendRetry( + ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, MiniRaftCluster cluster) + throws Exception { + List<CompletableFuture<RaftClient>> futures = new ArrayList<>(); + final int numRequest = 3; + for (int i = 0; i < numRequest; i++) { + final RaftClient client = cluster.createClient(leaderId, cluster.getGroup(), clientId); + final RaftClientRpc rpc = client.getClientRpc(); + final RaftClientRequest request = cluster.newRaftClientRequest(client.getId(), leaderId, + callId, seqNum, new RaftTestUtil.SimpleMessage("message"), RaftProtos.ReplicationLevel.ALL); + + LOG.info("{} sendRequestAsync {}", i, request); + futures.add(rpc.sendRequestAsync(request) + .thenApply(reply -> assertReply(reply, client, callId))); + } + + for(CompletableFuture<RaftClient> f : futures) { + try { + f.get(200, TimeUnit.MILLISECONDS); + Assert.fail("It should timeout for ReplicationLevel.ALL since a follower is down"); + } catch(TimeoutException te) { + LOG.info("Expected " + te); + } + } + return futures; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 32b787f..a13284f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -573,10 +573,13 @@ public class LeaderState { return lists; } - void replyPendingRequest(long logIndex, RaftClientReply reply) { - if (!pendingRequests.replyPendingRequest(logIndex, reply)) { + /** @return true if the request is replied; otherwise, the reply is delayed, return false. */ + boolean replyPendingRequest(long logIndex, RaftClientReply reply, RetryCache.CacheEntry cacheEntry) { + if (!pendingRequests.replyPendingRequest(logIndex, reply, cacheEntry)) { submitUpdateStateEvent(UPDATE_COMMIT_EVENT); + return false; } + return true; } TransactionContext getTransactionContext(long index) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java index 4a72197..cdb283f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.impl; import org.apache.ratis.protocol.*; +import org.apache.ratis.server.impl.RetryCache.CacheEntry; import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.Preconditions; @@ -26,15 +27,35 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; public class PendingRequest implements Comparable<PendingRequest> { + private static class DelayedReply { + private final RaftClientReply reply; + private final CacheEntry cacheEntry; + + DelayedReply(RaftClientReply reply, CacheEntry cacheEntry) { + this.reply = reply; + this.cacheEntry = cacheEntry; + } + + RaftClientReply getReply() { + cacheEntry.updateResult(reply); + return reply; + } + + RaftClientReply fail(NotReplicatedException e) { + final RaftClientReply failed = new RaftClientReply(reply, e); + cacheEntry.updateResult(failed); + return failed; + } + } + private final long index; private final RaftClientRequest request; private final TransactionContext entry; private final CompletableFuture<RaftClientReply> future; - private volatile RaftClientReply delayed; + private volatile DelayedReply delayed; - PendingRequest(long index, RaftClientRequest request, - TransactionContext entry) { + PendingRequest(long index, RaftClientRequest request, TransactionContext entry) { this.index = index; this.request = request; this.entry = entry; @@ -74,20 +95,20 @@ public class PendingRequest implements Comparable<PendingRequest> { future.complete(r); } - synchronized void setDelayedReply(RaftClientReply r) { + synchronized void setDelayedReply(RaftClientReply r, CacheEntry c) { Objects.requireNonNull(r); Preconditions.assertTrue(delayed == null); - delayed = r; + delayed = new DelayedReply(r, c); } synchronized void completeDelayedReply() { - setReply(delayed); + setReply(delayed.getReply()); } synchronized void failDelayedReply() { - final RaftClientRequest.Type type = request.getType(); - final ReplicationLevel replication = type.getWrite().getReplication(); - setReply(new RaftClientReply(delayed, new NotReplicatedException(request.getCallId(), replication, index))); + final ReplicationLevel replication = request.getType().getWrite().getReplication(); + final NotReplicatedException e = new NotReplicatedException(request.getCallId(), replication, index); + setReply(delayed.fail(e)); } TransactionContext setNotLeaderException(NotLeaderException nle) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index 2cf1271..92b3e96 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -18,10 +18,10 @@ package org.apache.ratis.server.impl; import org.apache.ratis.protocol.*; +import org.apache.ratis.server.impl.RetryCache.CacheEntry; import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.statemachine.TransactionContext; -import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,13 +84,13 @@ class PendingRequests { this.name = name + "-" + getClass().getSimpleName(); } - boolean delay(PendingRequest request, RaftClientReply reply) { + boolean delay(PendingRequest request, RaftClientReply reply, CacheEntry cacheEntry) { if (request.getIndex() <= allAckedIndex.get()) { return false; // delay is not required. } LOG.debug("{}: delay request {}", name, request); - request.setDelayedReply(reply); + request.setDelayedReply(reply, cacheEntry); final boolean offered; synchronized (q) { offered = q.offer(request); @@ -194,14 +194,15 @@ class PendingRequests { return pendingRequest != null ? pendingRequest.getEntry() : null; } - boolean replyPendingRequest(long index, RaftClientReply reply) { + /** @return true if the request is replied; otherwise, the reply is delayed, return false. */ + boolean replyPendingRequest(long index, RaftClientReply reply, CacheEntry cacheEntry) { final PendingRequest pending = pendingRequests.remove(index); if (pending != null) { Preconditions.assertTrue(pending.getIndex() == index); final ReplicationLevel replication = pending.getRequest().getType().getWrite().getReplication(); if (replication == ReplicationLevel.ALL) { - if (delayedReplies.delay(pending, reply)) { + if (delayedReplies.delay(pending, reply, cacheEntry)) { return false; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 3879f4a..f2114b9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1064,14 +1064,19 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou final StateMachineException e = new StateMachineException(getId(), exception); r = new RaftClientReply(clientId, serverId, groupId, callId, false, null, e, getCommitInfos()); } - // update retry cache - cacheEntry.updateResult(r); + // update pending request + boolean updateCache = true; // always update cache for follower synchronized (RaftServerImpl.this) { if (isLeader() && leaderState != null) { // is leader and is running - leaderState.replyPendingRequest(logEntry.getIndex(), r); + // For leader, update cache unless the reply is delayed. + // When a reply is delayed, the cache will be updated in DelayedReply.getReply(). + updateCache = leaderState.replyPendingRequest(logEntry.getIndex(), r, cacheEntry); } } + if (updateCache) { + cacheEntry.updateResult(r); + } }); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 2634717..3806bb8 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -39,7 +39,6 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.*; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -502,7 +501,12 @@ public abstract class MiniRaftCluster { } public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) { + return createClient(leaderId, group, null); + } + + public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, ClientId clientId) { return RaftClient.newBuilder() + .setClientId(clientId) .setRaftGroup(group) .setLeaderId(leaderId) .setProperties(properties) @@ -518,8 +522,13 @@ public abstract class MiniRaftCluster { public RaftClientRequest newRaftClientRequest( ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message) { + return newRaftClientRequest(clientId, leaderId, callId, seqNum, message, ReplicationLevel.MAJORITY); + } + + public RaftClientRequest newRaftClientRequest( + ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message, ReplicationLevel replication) { return new RaftClientRequest(clientId, leaderId, getGroupId(), - callId, seqNum, message, RaftClientRequest.writeRequestType(ReplicationLevel.MAJORITY)); + callId, seqNum, message, RaftClientRequest.writeRequestType(replication)); } public SetConfigurationRequest newSetConfigurationRequest( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java index 2d352b4..9fdb4f7 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java @@ -22,6 +22,7 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeer; @@ -80,18 +81,25 @@ public abstract class RetryCacheTests extends BaseTest { final long seqNum = 111; RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, seqNum, new SimpleMessage("message")); - RaftClientReply reply = rpc.sendRequest(r); - Assert.assertEquals(callId, reply.getCallId()); - Assert.assertTrue(reply.isSuccess()); + assertReply(rpc.sendRequest(r), client, callId); // retry with the same callId for (int i = 0; i < 5; i++) { - reply = rpc.sendRequest(r); - Assert.assertEquals(client.getId(), reply.getClientId()); - Assert.assertEquals(callId, reply.getCallId()); - Assert.assertTrue(reply.isSuccess()); + assertReply(rpc.sendRequest(r), client, callId); } + assertServer(cluster, client.getId(), callId, oldLastApplied); + client.close(); + } + + public static RaftClient assertReply(RaftClientReply reply, RaftClient client, long callId) { + Assert.assertEquals(client.getId(), reply.getClientId()); + Assert.assertEquals(callId, reply.getCallId()); + Assert.assertTrue(reply.isSuccess()); + return client; + } + + public void assertServer(MiniRaftCluster cluster, ClientId clientId, long callId, long oldLastApplied) throws Exception { long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex(); // make sure retry cache has the entry for (RaftServerImpl server : cluster.iterateServerImpls()) { @@ -100,13 +108,10 @@ public abstract class RetryCacheTests extends BaseTest { Thread.sleep(1000); } Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server)); - Assert.assertNotNull( - RaftServerTestUtil.getRetryEntry(server, client.getId(), callId)); + Assert.assertNotNull(RaftServerTestUtil.getRetryEntry(server, clientId, callId)); // make sure there is only one log entry committed - Assert.assertEquals(oldLastApplied + 1, - server.getState().getLastAppliedIndex()); + Assert.assertEquals(oldLastApplied + 1, server.getState().getLastAppliedIndex()); } - client.close(); } /** @@ -125,9 +130,7 @@ public abstract class RetryCacheTests extends BaseTest { final long seqNum = 111; RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, seqNum, new SimpleMessage("message")); - RaftClientReply reply = rpc.sendRequest(r); - Assert.assertEquals(callId, reply.getCallId()); - Assert.assertTrue(reply.isSuccess()); + assertReply(rpc.sendRequest(r), client, callId); long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex(); // trigger the reconfiguration, make sure the original leader is kicked out @@ -146,11 +149,8 @@ public abstract class RetryCacheTests extends BaseTest { rpc.addServers(Arrays.asList(change.newPeers)); for (int i = 0; i < 10; i++) { try { - reply = rpc.sendRequest(r); + assertReply(rpc.sendRequest(r), client, callId); LOG.info("successfully sent out the retry request_" + i); - Assert.assertEquals(client.getId(), reply.getClientId()); - Assert.assertEquals(callId, reply.getCallId()); - Assert.assertTrue(reply.isSuccess()); } catch (Exception e) { LOG.info("hit exception while retrying the same request: " + r, e); }
