Repository: incubator-ratis
Updated Branches:
  refs/heads/master 00f64b4c1 -> c692bf201


RATIS-208. Allow client to specify replication level in a request.  Contributed 
by Kit Hui


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c692bf20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c692bf20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c692bf20

Branch: refs/heads/master
Commit: c692bf201d01a67e2c85efcd625e0bf80c96758b
Parents: 00f64b4
Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Authored: Tue Apr 10 19:10:30 2018 +0800
Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Committed: Tue Apr 10 19:10:30 2018 +0800

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClient.java     | 23 ++++++-
 .../ratis/client/impl/RaftClientImpl.java       |  9 +--
 .../ratis/protocol/RaftClientRequest.java       | 23 ++++---
 ratis-proto-shaded/src/main/proto/Raft.proto    |  6 ++
 .../apache/ratis/server/impl/LeaderState.java   | 49 ++++++++------
 .../ratis/server/impl/PendingRequest.java       | 18 +++++-
 .../ratis/server/impl/PendingRequests.java      | 67 +++++++++++++++++++-
 .../java/org/apache/ratis/MiniRaftCluster.java  |  6 +-
 .../java/org/apache/ratis/RaftAsyncTests.java   | 18 +++++-
 .../java/org/apache/ratis/RaftBasicTests.java   | 39 ++++++++++--
 10 files changed, 214 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 84fec9e..5562f59 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,8 +46,17 @@ public interface RaftClient extends Closeable {
    * Async call to send the given message to the raft service.
    * The message may change the state of the service.
    * For readonly messages, use {@link #sendReadOnlyAsync(Message)} instead.
+   *
+   * @param message The request message.
+   * @param replication The replication level required.
+   * @return a future of the reply.
    */
-  CompletableFuture<RaftClientReply> sendAsync(Message message);
+  CompletableFuture<RaftClientReply> sendAsync(Message message, 
ReplicationLevel replication);
+
+  /** The same as sendAsync(message, MAJORITY). */
+  default CompletableFuture<RaftClientReply> sendAsync(Message message) {
+    return sendAsync(message, ReplicationLevel.MAJORITY);
+  }
 
   /** Async call to send the given readonly message to the raft service. */
   CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message);
@@ -58,8 +68,17 @@ public interface RaftClient extends Closeable {
    * Send the given message to the raft service.
    * The message may change the state of the service.
    * For readonly messages, use {@link #sendReadOnly(Message)} instead.
+   *
+   * @param message The request message.
+   * @param replication The replication level required.
+   * @return the reply.
    */
-  RaftClientReply send(Message message) throws IOException;
+  RaftClientReply send(Message message, ReplicationLevel replication) throws 
IOException;
+
+  /** The same as send(message, MAJORITY). */
+  default RaftClientReply send(Message message) throws IOException {
+    return send(message, ReplicationLevel.MAJORITY);
+  }
 
   /** Send the given readonly message to the raft service. */
   RaftClientReply sendReadOnly(Message message) throws IOException;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index f0abb10..e8a897b 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.util.*;
 
 import java.io.IOException;
@@ -129,8 +130,8 @@ final class RaftClientImpl implements RaftClient {
   }
 
   @Override
-  public CompletableFuture<RaftClientReply> sendAsync(Message message) {
-    return sendAsync(RaftClientRequest.writeRequestType(), message, null);
+  public CompletableFuture<RaftClientReply> sendAsync(Message message, 
ReplicationLevel replication) {
+    return sendAsync(RaftClientRequest.writeRequestType(replication), message, 
null);
   }
 
   @Override
@@ -168,8 +169,8 @@ final class RaftClientImpl implements RaftClient {
   }
 
   @Override
-  public RaftClientReply send(Message message) throws IOException {
-    return send(RaftClientRequest.writeRequestType(), message, null);
+  public RaftClientReply send(Message message, ReplicationLevel replication) 
throws IOException {
+    return send(RaftClientRequest.writeRequestType(replication), message, 
null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 072a854..232c51d 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -17,10 +17,7 @@
  */
 package org.apache.ratis.protocol;
 
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ReadRequestTypeProto;
-import org.apache.ratis.shaded.proto.RaftProtos.StaleReadRequestTypeProto;
-import org.apache.ratis.shaded.proto.RaftProtos.WriteRequestTypeProto;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.util.Preconditions;
 
 import java.util.Objects;
@@ -31,12 +28,20 @@ import static 
org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Ty
  * Request from client to server
  */
 public class RaftClientRequest extends RaftClientMessage {
-  private static final Type DEFAULT_WRITE = new 
Type(WriteRequestTypeProto.getDefaultInstance());
+  private static final Type WRITE_DEFAULT = new 
Type(WriteRequestTypeProto.getDefaultInstance());
+  private static final Type WRITE_ALL = new Type(
+      
WriteRequestTypeProto.newBuilder().setReplication(ReplicationLevel.ALL).build());
+
   private static final Type DEFAULT_READ = new 
Type(ReadRequestTypeProto.getDefaultInstance());
   private static final Type DEFAULT_STALE_READ = new 
Type(StaleReadRequestTypeProto.getDefaultInstance());
 
-  public static Type writeRequestType() {
-    return DEFAULT_WRITE;
+  public static Type writeRequestType(ReplicationLevel replication) {
+    switch (replication) {
+      case MAJORITY: return WRITE_DEFAULT;
+      case ALL: return WRITE_ALL;
+      default:
+        throw new IllegalArgumentException("Unexpected replication: " + 
replication);
+    }
   }
 
   public static Type readRequestType() {
@@ -51,7 +56,7 @@ public class RaftClientRequest extends RaftClientMessage {
   /** The type of a request (oneof write, read, staleRead; see the message 
RaftClientRequestProto). */
   public static class Type {
     public static Type valueOf(WriteRequestTypeProto write) {
-      return DEFAULT_WRITE;
+      return writeRequestType(write.getReplication());
     }
 
     public static Type valueOf(ReadRequestTypeProto read) {
@@ -136,7 +141,7 @@ public class RaftClientRequest extends RaftClientMessage {
 
   public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
       RaftGroupId groupId, long callId) {
-    this(clientId, serverId, groupId, callId, 0L, null, writeRequestType());
+    this(clientId, serverId, groupId, callId, 0L, null, WRITE_DEFAULT);
   }
 
   public RaftClientRequest(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto 
b/ratis-proto-shaded/src/main/proto/Raft.proto
index 0fa845b..3f0baf8 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -170,7 +170,13 @@ message ClientMessageEntryProto {
   bytes content = 1;
 }
 
+enum ReplicationLevel {
+  MAJORITY = 0;
+  ALL = 1;
+}
+
 message WriteRequestTypeProto {
+  ReplicationLevel replication = 1;
 }
 
 message ReadRequestTypeProto {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index ca27b7e..309ebf5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -448,16 +448,14 @@ public class LeaderState {
       return;
     }
 
-    final long majorityInNewConf = computeLastCommitted(followers, 
includeSelf);
-    final long oldLastCommitted = raftLog.getLastCommittedIndex();
-    final TermIndex[] entriesToCommit;
+    final long[] indicesInNewConf = computeCommittedIndices(followers, 
includeSelf);
+    final long majorityInNewConf = getMajority(indicesInNewConf);
+    final long majority;
+    final long min;
+
     if (!conf.isTransitional()) {
-      // copy the entries that may get committed out of the raftlog, to prevent
-      // the possible race that the log gets purged after the statemachine does
-      // a snapshot
-      entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
-          Math.max(majorityInNewConf, oldLastCommitted) + 1);
-      server.getState().updateStatemachine(majorityInNewConf, currentTerm);
+      majority = majorityInNewConf;
+      min = indicesInNewConf[0];
     } else { // configuration is in transitional state
       final List<FollowerInfo> oldFollowers = voterLists.get(1);
       final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
@@ -465,13 +463,23 @@ public class LeaderState {
         return;
       }
 
-      final long majorityInOldConf = computeLastCommitted(oldFollowers, 
includeSelfInOldConf);
-      final long majority = Math.min(majorityInNewConf, majorityInOldConf);
-      entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
-          Math.max(majority, oldLastCommitted) + 1);
+      final long[] indicesInOldConf = computeCommittedIndices(oldFollowers, 
includeSelfInOldConf);
+      final long majorityInOldConf = getMajority(indicesInOldConf);
+      majority = Math.min(majorityInNewConf, majorityInOldConf);
+      min = Math.min(indicesInNewConf[0], indicesInOldConf[0]);
+    }
+
+    final long oldLastCommitted = raftLog.getLastCommittedIndex();
+    if (majority > oldLastCommitted) {
+      // copy the entries out from the raftlog, in order to prevent that
+      // the log gets purged after the statemachine does a snapshot
+      final TermIndex[] entriesToCommit = raftLog.getEntries(
+          oldLastCommitted + 1, majority + 1);
       server.getState().updateStatemachine(majority, currentTerm);
+      checkAndUpdateConfiguration(entriesToCommit);
     }
-    checkAndUpdateConfiguration(entriesToCommit);
+
+    pendingRequests.checkDelayedReplies(min);
   }
 
   private boolean committedConf(TermIndex[] entries) {
@@ -529,8 +537,11 @@ public class LeaderState {
     notifySenders();
   }
 
-  private long computeLastCommitted(List<FollowerInfo> followers,
-      boolean includeSelf) {
+  static long getMajority(long[] indices) {
+    return indices[(indices.length - 1) / 2];
+  }
+
+  private long[] computeCommittedIndices(List<FollowerInfo> followers, boolean 
includeSelf) {
     final int length = includeSelf ? followers.size() + 1 : followers.size();
     if (length == 0) {
       throw new IllegalArgumentException("followers.size() == "
@@ -546,7 +557,7 @@ public class LeaderState {
     }
 
     Arrays.sort(indices);
-    return indices[(indices.length - 1) / 2];
+    return indices;
   }
 
   private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) {
@@ -567,7 +578,9 @@ public class LeaderState {
   }
 
   void replyPendingRequest(long logIndex, RaftClientReply reply) {
-    pendingRequests.replyPendingRequest(logIndex, reply);
+    if (!pendingRequests.replyPendingRequest(logIndex, reply)) {
+      submitUpdateStateEvent(UPDATE_COMMIT_EVENT);
+    }
   }
 
   TransactionContext getTransactionContext(long index) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index b63cd01..10cd95f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -17,10 +17,14 @@
  */
 package org.apache.ratis.server.impl;
 
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.Preconditions;
 
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
 public class PendingRequest implements Comparable<PendingRequest> {
@@ -29,6 +33,8 @@ public class PendingRequest implements 
Comparable<PendingRequest> {
   private final TransactionContext entry;
   private final CompletableFuture<RaftClientReply> future;
 
+  private volatile RaftClientReply delayed;
+
   PendingRequest(long index, RaftClientRequest request,
                  TransactionContext entry) {
     this.index = index;
@@ -70,6 +76,16 @@ public class PendingRequest implements 
Comparable<PendingRequest> {
     future.complete(r);
   }
 
+  synchronized void setDelayedReply(RaftClientReply r) {
+    Objects.requireNonNull(r);
+    Preconditions.assertTrue(delayed == null);
+    delayed = r;
+  }
+
+  synchronized void completeDelayedReply() {
+    setReply(delayed);
+  }
+
   TransactionContext setNotLeaderException(NotLeaderException nle) {
     setReply(new RaftClientReply(getRequest(), nle, null));
     return getEntry();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index b418658..c02d7c3 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -18,16 +18,20 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 class PendingRequests {
@@ -71,14 +75,63 @@ class PendingRequests {
     }
   }
 
+  private static class DelayedReplies {
+    private final String name;
+    private final PriorityQueue<PendingRequest> q = new PriorityQueue<>();
+    private AtomicLong allAckedIndex = new AtomicLong();
+
+    private DelayedReplies(Object name) {
+      this.name = name + "-" + getClass().getSimpleName();
+    }
+
+    boolean delay(PendingRequest request, RaftClientReply reply) {
+      if (request.getIndex() <= allAckedIndex.get()) {
+        return false; // delay is not required.
+      }
+
+      LOG.debug("{}: delay request {}", name, request);
+      request.setDelayedReply(reply);
+      final boolean offered;
+      synchronized (q) {
+        offered = q.offer(request);
+      }
+      Preconditions.assertTrue(offered);
+      return true;
+    }
+
+    void update(final long allAcked) {
+      final long old = allAckedIndex.getAndUpdate(n -> allAcked > n? allAcked 
: n);
+      if (allAcked <= old) {
+        return;
+      }
+
+      LOG.debug("{}: update allAckedIndex {} -> {}", name, old, allAcked);
+      for(;;) {
+        final PendingRequest polled;
+        synchronized (q) {
+          final PendingRequest peeked = q.peek();
+          if (peeked == null || peeked.getIndex() > allAcked) {
+            return;
+          }
+          polled = q.poll();
+          Preconditions.assertTrue(polled == peeked);
+        }
+        polled.completeDelayedReply();
+      }
+    }
+  }
+
   private PendingRequest pendingSetConf;
   private final RaftServerImpl server;
   private final RequestMap pendingRequests;
   private PendingRequest last = null;
 
+  private final DelayedReplies delayedReplies;
+
   PendingRequests(RaftServerImpl server) {
     this.server = server;
     this.pendingRequests = new RequestMap(server.getId());
+    this.delayedReplies = new DelayedReplies(server.getId());
   }
 
   PendingRequest addPendingRequest(long index, RaftClientRequest request,
@@ -132,12 +185,20 @@ class PendingRequests {
     return pendingRequest != null ? pendingRequest.getEntry() : null;
   }
 
-  void replyPendingRequest(long index, RaftClientReply reply) {
+  boolean replyPendingRequest(long index, RaftClientReply reply) {
     final PendingRequest pending = pendingRequests.remove(index);
     if (pending != null) {
       Preconditions.assertTrue(pending.getIndex() == index);
+
+      final ReplicationLevel replication = 
pending.getRequest().getType().getWrite().getReplication();
+      if (replication == ReplicationLevel.ALL) {
+        if (delayedReplies.delay(pending, reply)) {
+          return false;
+        }
+      }
       pending.setReply(reply);
     }
+    return true;
   }
 
   /**
@@ -155,4 +216,8 @@ class PendingRequests {
       pendingSetConf.setNotLeaderException(nle);
     }
   }
+
+  void checkDelayedReplies(long allAckedIndex) {
+    delayedReplies.update(allAckedIndex);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 91a1600..74238e8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -28,6 +28,7 @@ import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.storage.MemoryRaftLog;
 import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.*;
@@ -450,6 +451,9 @@ public abstract class MiniRaftCluster {
   public static Stream<RaftServerImpl> 
getServerStream(Collection<RaftServerProxy> servers) {
     return servers.stream().map(RaftTestUtil::getImplAsUnchecked);
   }
+  public Stream<RaftServerImpl> getServerStream() {
+    return getServerStream(getServers());
+  }
   public Stream<RaftServerImpl> getServerAliveStream() {
     return getServerStream(getServers()).filter(RaftServerImpl::isAlive);
   }
@@ -504,7 +508,7 @@ public abstract class MiniRaftCluster {
   public RaftClientRequest newRaftClientRequest(
       ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, 
Message message) {
     return new RaftClientRequest(clientId, leaderId, getGroupId(),
-        callId, seqNum, message, RaftClientRequest.writeRequestType());
+        callId, seqNum, message, 
RaftClientRequest.writeRequestType(ReplicationLevel.MAJORITY));
   }
 
   public SetConfigurationRequest newSetConfigurationRequest(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 3c68469..438d56a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -30,12 +30,15 @@ import 
org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import 
org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.TimeDuration;
-import org.junit.*;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -157,7 +160,18 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
     cluster.start();
     waitForLeader(cluster);
-    RaftBasicTests.runTestBasicAppendEntries(true, 1000, cluster, LOG);
+    RaftBasicTests.runTestBasicAppendEntries(true, ReplicationLevel.MAJORITY, 
1000, cluster, LOG);
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testBasicAppendEntriesAsyncWithAllReplication() throws Exception 
{
+    LOG.info("Running testBasicAppendEntriesAsync");
+    RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100);
+    final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+    cluster.start();
+    waitForLeader(cluster);
+    RaftBasicTests.runTestBasicAppendEntries(true, ReplicationLevel.ALL, 1000, 
cluster, LOG);
     cluster.shutdown();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index b0980f4..4deeef5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -24,12 +24,14 @@ import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RetryCacheTestUtil;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.TimeDuration;
@@ -90,16 +92,39 @@ public abstract class RaftBasicTests extends BaseTest {
 
   @Test
   public void testBasicAppendEntries() throws Exception {
-    runTestBasicAppendEntries(false, 10, getCluster(), LOG);
+    runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, 10, 
getCluster(), LOG);
+  }
+
+  @Test
+  public void testBasicAppendEntriesWithAllReplication() throws Exception {
+    runTestBasicAppendEntries(false, ReplicationLevel.ALL, 10, getCluster(), 
LOG);
   }
 
   static void runTestBasicAppendEntries(
-      boolean async, int numMessages, MiniRaftCluster cluster, Logger LOG) 
throws Exception {
+      boolean async, ReplicationLevel replication, int numMessages, 
MiniRaftCluster cluster, Logger LOG) throws Exception {
     LOG.info("runTestBasicAppendEntries: async? " + async + ", numMessages=" + 
numMessages);
+    for (RaftServer s : cluster.getServers()) {
+      cluster.restartServer(s.getId(), false);
+    }
     RaftServerImpl leader = waitForLeader(cluster);
     final long term = leader.getState().getCurrentTerm();
+
     final RaftPeerId killed = cluster.getFollowers().get(0).getId();
     cluster.killServer(killed);
+
+    if (replication == ReplicationLevel.ALL) {
+      new Thread(() -> {
+        try {
+          Thread.sleep(3000);
+          LOG.info("restart server: " + killed.toString());
+          cluster.restartServer(killed, false);
+        } catch (Exception e) {
+          LOG.info("cannot restart server: " + killed.toString());
+          e.printStackTrace();
+        }
+      }).start();
+    }
+
     LOG.info(cluster.printServers());
 
     final SimpleMessage[] messages = SimpleMessage.create(numMessages);
@@ -110,7 +135,7 @@ public abstract class RaftBasicTests extends BaseTest {
 
       for (SimpleMessage message : messages) {
         if (async) {
-          client.sendAsync(message).thenAcceptAsync(reply -> {
+          client.sendAsync(message, replication).thenAcceptAsync(reply -> {
             if (!reply.isSuccess()) {
               f.completeExceptionally(
                   new AssertionError("Failed with reply " + reply));
@@ -119,7 +144,7 @@ public abstract class RaftBasicTests extends BaseTest {
             }
           });
         } else {
-          client.send(message);
+          client.send(message, replication);
         }
       }
       if (async) {
@@ -127,14 +152,16 @@ public abstract class RaftBasicTests extends BaseTest {
         Assert.assertEquals(messages.length, asyncReplyCount.get());
       }
     }
-
-    Thread.sleep(cluster.getMaxTimeout() + 100);
+    if (replication != ReplicationLevel.ALL) {
+      Thread.sleep(cluster.getMaxTimeout() + 100);
+    }
     LOG.info(cluster.printAllLogs());
 
     cluster.getServerAliveStream().map(s -> s.getState().getLog())
         .forEach(log -> RaftTestUtil.assertLogEntries(log, async, term, 
messages));
   }
 
+
   @Test
   public void testOldLeaderCommit() throws Exception {
     LOG.info("Running testOldLeaderCommit");

Reply via email to