Repository: incubator-ratis
Updated Branches:
  refs/heads/master e4a016fb0 -> 86db875aa


RATIS-270. Replication ALL requests should not be replied from retry cache if 
they are delayed. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/86db875a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/86db875a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/86db875a

Branch: refs/heads/master
Commit: 86db875aa622fb5432c6048894eafc54c236a7c1
Parents: e4a016f
Author: Mukul Kumar Singh <msi...@apache.org>
Authored: Sat Aug 11 17:50:37 2018 +0530
Committer: Mukul Kumar Singh <msi...@apache.org>
Committed: Sat Aug 11 17:50:37 2018 +0530

----------------------------------------------------------------------
 .../ratis/grpc/TestRetryCacheWithGrpc.java      | 71 +++++++++++++++++++-
 .../apache/ratis/server/impl/LeaderState.java   |  7 +-
 .../ratis/server/impl/PendingRequest.java       | 39 ++++++++---
 .../ratis/server/impl/PendingRequests.java      | 11 +--
 .../ratis/server/impl/RaftServerImpl.java       | 11 ++-
 .../java/org/apache/ratis/MiniRaftCluster.java  | 13 +++-
 .../java/org/apache/ratis/RetryCacheTests.java  | 38 +++++------
 7 files changed, 149 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
index 956fd66..f577a48 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
@@ -18,12 +18,26 @@
 package org.apache.ratis.grpc;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.RetryCacheTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.shaded.proto.RaftProtos;
 import org.apache.ratis.util.LogUtils;
 import org.junit.Assert;
+import org.junit.Test;
 
 public class TestRetryCacheWithGrpc extends RetryCacheTests {
   static {
@@ -43,4 +57,59 @@ public class TestRetryCacheWithGrpc extends RetryCacheTests {
     return cluster;
   }
 
-}
+  @Test
+  public void testAsyncRetryWithReplicatedAll() throws Exception {
+    final MiniRaftCluster cluster = getCluster();
+    RaftTestUtil.waitForLeader(cluster);
+
+    final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage().getId();
+    long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+
+    // Kill a follower
+    final RaftPeerId killedFollower = cluster.getFollowers().get(0).getId();
+    cluster.killServer(killedFollower);
+
+    final long callId = 999;
+    final long seqNum = 111;
+    final ClientId clientId = ClientId.randomId();
+
+    // Retry with the same clientId and callId
+    final List<CompletableFuture<RaftClient>> futures = new ArrayList<>();
+    futures.addAll(sendRetry(clientId, leaderId, callId, seqNum, cluster));
+    futures.addAll(sendRetry(clientId, leaderId, callId, seqNum, cluster));
+
+    // restart the killed follower
+    cluster.restartServer(killedFollower, false);
+    for(CompletableFuture<RaftClient> f : futures) {
+      f.join().close();
+    }
+    assertServer(cluster, clientId, callId, oldLastApplied);
+  }
+
+  List<CompletableFuture<RaftClient>> sendRetry(
+      ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, 
MiniRaftCluster cluster)
+      throws Exception {
+    List<CompletableFuture<RaftClient>> futures = new ArrayList<>();
+    final int numRequest = 3;
+    for (int i = 0; i < numRequest; i++) {
+      final RaftClient client = cluster.createClient(leaderId, 
cluster.getGroup(), clientId);
+      final RaftClientRpc rpc = client.getClientRpc();
+      final RaftClientRequest request = 
cluster.newRaftClientRequest(client.getId(), leaderId,
+          callId, seqNum, new RaftTestUtil.SimpleMessage("message"), 
RaftProtos.ReplicationLevel.ALL);
+
+      LOG.info("{} sendRequestAsync {}", i, request);
+      futures.add(rpc.sendRequestAsync(request)
+          .thenApply(reply -> assertReply(reply, client, callId)));
+    }
+
+    for(CompletableFuture<RaftClient> f : futures) {
+      try {
+        f.get(200, TimeUnit.MILLISECONDS);
+        Assert.fail("It should timeout for ReplicationLevel.ALL since a 
follower is down");
+      } catch(TimeoutException te) {
+        LOG.info("Expected " + te);
+      }
+    }
+    return futures;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 32b787f..a13284f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -573,10 +573,13 @@ public class LeaderState {
     return lists;
   }
 
-  void replyPendingRequest(long logIndex, RaftClientReply reply) {
-    if (!pendingRequests.replyPendingRequest(logIndex, reply)) {
+  /** @return true if the request is replied; otherwise, the reply is delayed, 
return false. */
+  boolean replyPendingRequest(long logIndex, RaftClientReply reply, 
RetryCache.CacheEntry cacheEntry) {
+    if (!pendingRequests.replyPendingRequest(logIndex, reply, cacheEntry)) {
       submitUpdateStateEvent(UPDATE_COMMIT_EVENT);
+      return false;
     }
+    return true;
   }
 
   TransactionContext getTransactionContext(long index) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index 4a72197..cdb283f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.impl.RetryCache.CacheEntry;
 import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.Preconditions;
@@ -26,15 +27,35 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
 public class PendingRequest implements Comparable<PendingRequest> {
+  private static class DelayedReply {
+    private final RaftClientReply reply;
+    private final CacheEntry cacheEntry;
+
+    DelayedReply(RaftClientReply reply, CacheEntry cacheEntry) {
+      this.reply = reply;
+      this.cacheEntry = cacheEntry;
+    }
+
+    RaftClientReply getReply() {
+      cacheEntry.updateResult(reply);
+      return reply;
+    }
+
+    RaftClientReply fail(NotReplicatedException e) {
+      final RaftClientReply failed = new RaftClientReply(reply, e);
+      cacheEntry.updateResult(failed);
+      return failed;
+    }
+  }
+
   private final long index;
   private final RaftClientRequest request;
   private final TransactionContext entry;
   private final CompletableFuture<RaftClientReply> future;
 
-  private volatile RaftClientReply delayed;
+  private volatile DelayedReply delayed;
 
-  PendingRequest(long index, RaftClientRequest request,
-                 TransactionContext entry) {
+  PendingRequest(long index, RaftClientRequest request, TransactionContext 
entry) {
     this.index = index;
     this.request = request;
     this.entry = entry;
@@ -74,20 +95,20 @@ public class PendingRequest implements 
Comparable<PendingRequest> {
     future.complete(r);
   }
 
-  synchronized void setDelayedReply(RaftClientReply r) {
+  synchronized void setDelayedReply(RaftClientReply r, CacheEntry c) {
     Objects.requireNonNull(r);
     Preconditions.assertTrue(delayed == null);
-    delayed = r;
+    delayed = new DelayedReply(r, c);
   }
 
   synchronized void completeDelayedReply() {
-    setReply(delayed);
+    setReply(delayed.getReply());
   }
 
   synchronized void failDelayedReply() {
-    final RaftClientRequest.Type type = request.getType();
-    final ReplicationLevel replication = type.getWrite().getReplication();
-    setReply(new RaftClientReply(delayed, new 
NotReplicatedException(request.getCallId(), replication, index)));
+    final ReplicationLevel replication = 
request.getType().getWrite().getReplication();
+    final NotReplicatedException e = new 
NotReplicatedException(request.getCallId(), replication, index);
+    setReply(delayed.fail(e));
   }
 
   TransactionContext setNotLeaderException(NotLeaderException nle) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 2cf1271..92b3e96 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -18,10 +18,10 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.impl.RetryCache.CacheEntry;
 import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,13 +84,13 @@ class PendingRequests {
       this.name = name + "-" + getClass().getSimpleName();
     }
 
-    boolean delay(PendingRequest request, RaftClientReply reply) {
+    boolean delay(PendingRequest request, RaftClientReply reply, CacheEntry 
cacheEntry) {
       if (request.getIndex() <= allAckedIndex.get()) {
         return false; // delay is not required.
       }
 
       LOG.debug("{}: delay request {}", name, request);
-      request.setDelayedReply(reply);
+      request.setDelayedReply(reply, cacheEntry);
       final boolean offered;
       synchronized (q) {
         offered = q.offer(request);
@@ -194,14 +194,15 @@ class PendingRequests {
     return pendingRequest != null ? pendingRequest.getEntry() : null;
   }
 
-  boolean replyPendingRequest(long index, RaftClientReply reply) {
+  /** @return true if the request is replied; otherwise, the reply is delayed, 
return false. */
+  boolean replyPendingRequest(long index, RaftClientReply reply, CacheEntry 
cacheEntry) {
     final PendingRequest pending = pendingRequests.remove(index);
     if (pending != null) {
       Preconditions.assertTrue(pending.getIndex() == index);
 
       final ReplicationLevel replication = 
pending.getRequest().getType().getWrite().getReplication();
       if (replication == ReplicationLevel.ALL) {
-        if (delayedReplies.delay(pending, reply)) {
+        if (delayedReplies.delay(pending, reply, cacheEntry)) {
           return false;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 3879f4a..f2114b9 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1064,14 +1064,19 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
         final StateMachineException e = new StateMachineException(getId(), 
exception);
         r = new RaftClientReply(clientId, serverId, groupId, callId, false, 
null, e, getCommitInfos());
       }
-      // update retry cache
-      cacheEntry.updateResult(r);
+
       // update pending request
+      boolean updateCache = true;  // always update cache for follower
       synchronized (RaftServerImpl.this) {
         if (isLeader() && leaderState != null) { // is leader and is running
-          leaderState.replyPendingRequest(logEntry.getIndex(), r);
+          // For leader, update cache unless the reply is delayed.
+          // When a reply is delayed, the cache will be updated in 
DelayedReply.getReply().
+          updateCache = leaderState.replyPendingRequest(logEntry.getIndex(), 
r, cacheEntry);
         }
       }
+      if (updateCache) {
+        cacheEntry.updateResult(r);
+      }
     });
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 2634717..3806bb8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -39,7 +39,6 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -502,7 +501,12 @@ public abstract class MiniRaftCluster {
   }
 
   public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) {
+    return createClient(leaderId, group, null);
+  }
+
+  public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, 
ClientId clientId) {
     return RaftClient.newBuilder()
+        .setClientId(clientId)
         .setRaftGroup(group)
         .setLeaderId(leaderId)
         .setProperties(properties)
@@ -518,8 +522,13 @@ public abstract class MiniRaftCluster {
 
   public RaftClientRequest newRaftClientRequest(
       ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, 
Message message) {
+    return newRaftClientRequest(clientId, leaderId, callId, seqNum, message, 
ReplicationLevel.MAJORITY);
+  }
+
+  public RaftClientRequest newRaftClientRequest(
+      ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, 
Message message, ReplicationLevel replication) {
     return new RaftClientRequest(clientId, leaderId, getGroupId(),
-        callId, seqNum, message, 
RaftClientRequest.writeRequestType(ReplicationLevel.MAJORITY));
+        callId, seqNum, message, 
RaftClientRequest.writeRequestType(replication));
   }
 
   public SetConfigurationRequest newSetConfigurationRequest(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 2d352b4..9fdb4f7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -22,6 +22,7 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
@@ -80,18 +81,25 @@ public abstract class RetryCacheTests extends BaseTest {
     final long seqNum = 111;
     RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), 
leaderId,
         callId, seqNum, new SimpleMessage("message"));
-    RaftClientReply reply = rpc.sendRequest(r);
-    Assert.assertEquals(callId, reply.getCallId());
-    Assert.assertTrue(reply.isSuccess());
+    assertReply(rpc.sendRequest(r), client, callId);
 
     // retry with the same callId
     for (int i = 0; i < 5; i++) {
-      reply = rpc.sendRequest(r);
-      Assert.assertEquals(client.getId(), reply.getClientId());
-      Assert.assertEquals(callId, reply.getCallId());
-      Assert.assertTrue(reply.isSuccess());
+      assertReply(rpc.sendRequest(r), client, callId);
     }
 
+    assertServer(cluster, client.getId(), callId, oldLastApplied);
+    client.close();
+  }
+
+  public static RaftClient assertReply(RaftClientReply reply, RaftClient 
client, long callId) {
+    Assert.assertEquals(client.getId(), reply.getClientId());
+    Assert.assertEquals(callId, reply.getCallId());
+    Assert.assertTrue(reply.isSuccess());
+    return client;
+  }
+
+  public void assertServer(MiniRaftCluster cluster, ClientId clientId, long 
callId, long oldLastApplied) throws Exception {
     long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex();
     // make sure retry cache has the entry
     for (RaftServerImpl server : cluster.iterateServerImpls()) {
@@ -100,13 +108,10 @@ public abstract class RetryCacheTests extends BaseTest {
         Thread.sleep(1000);
       }
       Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server));
-      Assert.assertNotNull(
-          RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
+      Assert.assertNotNull(RaftServerTestUtil.getRetryEntry(server, clientId, 
callId));
       // make sure there is only one log entry committed
-      Assert.assertEquals(oldLastApplied + 1,
-          server.getState().getLastAppliedIndex());
+      Assert.assertEquals(oldLastApplied + 1, 
server.getState().getLastAppliedIndex());
     }
-    client.close();
   }
 
   /**
@@ -125,9 +130,7 @@ public abstract class RetryCacheTests extends BaseTest {
     final long seqNum = 111;
     RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), 
leaderId,
         callId, seqNum, new SimpleMessage("message"));
-    RaftClientReply reply = rpc.sendRequest(r);
-    Assert.assertEquals(callId, reply.getCallId());
-    Assert.assertTrue(reply.isSuccess());
+    assertReply(rpc.sendRequest(r), client, callId);
     long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
 
     // trigger the reconfiguration, make sure the original leader is kicked out
@@ -146,11 +149,8 @@ public abstract class RetryCacheTests extends BaseTest {
     rpc.addServers(Arrays.asList(change.newPeers));
     for (int i = 0; i < 10; i++) {
       try {
-        reply = rpc.sendRequest(r);
+        assertReply(rpc.sendRequest(r), client, callId);
         LOG.info("successfully sent out the retry request_" + i);
-        Assert.assertEquals(client.getId(), reply.getClientId());
-        Assert.assertEquals(callId, reply.getCallId());
-        Assert.assertTrue(reply.isSuccess());
       } catch (Exception e) {
         LOG.info("hit exception while retrying the same request: " + r, e);
       }

Reply via email to