Repository: incubator-ratis Updated Branches: refs/heads/master 00f64b4c1 -> c692bf201
RATIS-208. Allow client to specify replication level in a request. Contributed by Kit Hui Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c692bf20 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c692bf20 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c692bf20 Branch: refs/heads/master Commit: c692bf201d01a67e2c85efcd625e0bf80c96758b Parents: 00f64b4 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Tue Apr 10 19:10:30 2018 +0800 Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Committed: Tue Apr 10 19:10:30 2018 +0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 23 ++++++- .../ratis/client/impl/RaftClientImpl.java | 9 +-- .../ratis/protocol/RaftClientRequest.java | 23 ++++--- ratis-proto-shaded/src/main/proto/Raft.proto | 6 ++ .../apache/ratis/server/impl/LeaderState.java | 49 ++++++++------ .../ratis/server/impl/PendingRequest.java | 18 +++++- .../ratis/server/impl/PendingRequests.java | 67 +++++++++++++++++++- .../java/org/apache/ratis/MiniRaftCluster.java | 6 +- .../java/org/apache/ratis/RaftAsyncTests.java | 18 +++++- .../java/org/apache/ratis/RaftBasicTests.java | 39 ++++++++++-- 10 files changed, 214 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 84fec9e..5562f59 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -23,6 +23,7 @@ import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,8 +46,17 @@ public interface RaftClient extends Closeable { * Async call to send the given message to the raft service. * The message may change the state of the service. * For readonly messages, use {@link #sendReadOnlyAsync(Message)} instead. + * + * @param message The request message. + * @param replication The replication level required. + * @return a future of the reply. */ - CompletableFuture<RaftClientReply> sendAsync(Message message); + CompletableFuture<RaftClientReply> sendAsync(Message message, ReplicationLevel replication); + + /** The same as sendAsync(message, MAJORITY). */ + default CompletableFuture<RaftClientReply> sendAsync(Message message) { + return sendAsync(message, ReplicationLevel.MAJORITY); + } /** Async call to send the given readonly message to the raft service. */ CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message); @@ -58,8 +68,17 @@ public interface RaftClient extends Closeable { * Send the given message to the raft service. * The message may change the state of the service. * For readonly messages, use {@link #sendReadOnly(Message)} instead. + * + * @param message The request message. + * @param replication The replication level required. + * @return the reply. */ - RaftClientReply send(Message message) throws IOException; + RaftClientReply send(Message message, ReplicationLevel replication) throws IOException; + + /** The same as send(message, MAJORITY). */ + default RaftClientReply send(Message message) throws IOException { + return send(message, ReplicationLevel.MAJORITY); + } /** Send the given readonly message to the raft service. */ RaftClientReply sendReadOnly(Message message) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index f0abb10..e8a897b 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; +import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.util.*; import java.io.IOException; @@ -129,8 +130,8 @@ final class RaftClientImpl implements RaftClient { } @Override - public CompletableFuture<RaftClientReply> sendAsync(Message message) { - return sendAsync(RaftClientRequest.writeRequestType(), message, null); + public CompletableFuture<RaftClientReply> sendAsync(Message message, ReplicationLevel replication) { + return sendAsync(RaftClientRequest.writeRequestType(replication), message, null); } @Override @@ -168,8 +169,8 @@ final class RaftClientImpl implements RaftClient { } @Override - public RaftClientReply send(Message message) throws IOException { - return send(RaftClientRequest.writeRequestType(), message, null); + public RaftClientReply send(Message message, ReplicationLevel replication) throws IOException { + return send(RaftClientRequest.writeRequestType(replication), message, null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index 072a854..232c51d 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -17,10 +17,7 @@ */ package org.apache.ratis.protocol; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.ReadRequestTypeProto; -import org.apache.ratis.shaded.proto.RaftProtos.StaleReadRequestTypeProto; -import org.apache.ratis.shaded.proto.RaftProtos.WriteRequestTypeProto; +import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.util.Preconditions; import java.util.Objects; @@ -31,12 +28,20 @@ import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Ty * Request from client to server */ public class RaftClientRequest extends RaftClientMessage { - private static final Type DEFAULT_WRITE = new Type(WriteRequestTypeProto.getDefaultInstance()); + private static final Type WRITE_DEFAULT = new Type(WriteRequestTypeProto.getDefaultInstance()); + private static final Type WRITE_ALL = new Type( + WriteRequestTypeProto.newBuilder().setReplication(ReplicationLevel.ALL).build()); + private static final Type DEFAULT_READ = new Type(ReadRequestTypeProto.getDefaultInstance()); private static final Type DEFAULT_STALE_READ = new Type(StaleReadRequestTypeProto.getDefaultInstance()); - public static Type writeRequestType() { - return DEFAULT_WRITE; + public static Type writeRequestType(ReplicationLevel replication) { + switch (replication) { + case MAJORITY: return WRITE_DEFAULT; + case ALL: return WRITE_ALL; + default: + throw new IllegalArgumentException("Unexpected replication: " + replication); + } } public static Type readRequestType() { @@ -51,7 +56,7 @@ public class RaftClientRequest extends RaftClientMessage { /** The type of a request (oneof write, read, staleRead; see the message RaftClientRequestProto). */ public static class Type { public static Type valueOf(WriteRequestTypeProto write) { - return DEFAULT_WRITE; + return writeRequestType(write.getReplication()); } public static Type valueOf(ReadRequestTypeProto read) { @@ -136,7 +141,7 @@ public class RaftClientRequest extends RaftClientMessage { public RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId) { - this(clientId, serverId, groupId, callId, 0L, null, writeRequestType()); + this(clientId, serverId, groupId, callId, 0L, null, WRITE_DEFAULT); } public RaftClientRequest( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 0fa845b..3f0baf8 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -170,7 +170,13 @@ message ClientMessageEntryProto { bytes content = 1; } +enum ReplicationLevel { + MAJORITY = 0; + ALL = 1; +} + message WriteRequestTypeProto { + ReplicationLevel replication = 1; } message ReadRequestTypeProto { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index ca27b7e..309ebf5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -448,16 +448,14 @@ public class LeaderState { return; } - final long majorityInNewConf = computeLastCommitted(followers, includeSelf); - final long oldLastCommitted = raftLog.getLastCommittedIndex(); - final TermIndex[] entriesToCommit; + final long[] indicesInNewConf = computeCommittedIndices(followers, includeSelf); + final long majorityInNewConf = getMajority(indicesInNewConf); + final long majority; + final long min; + if (!conf.isTransitional()) { - // copy the entries that may get committed out of the raftlog, to prevent - // the possible race that the log gets purged after the statemachine does - // a snapshot - entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, - Math.max(majorityInNewConf, oldLastCommitted) + 1); - server.getState().updateStatemachine(majorityInNewConf, currentTerm); + majority = majorityInNewConf; + min = indicesInNewConf[0]; } else { // configuration is in transitional state final List<FollowerInfo> oldFollowers = voterLists.get(1); final boolean includeSelfInOldConf = conf.containsInOldConf(selfId); @@ -465,13 +463,23 @@ public class LeaderState { return; } - final long majorityInOldConf = computeLastCommitted(oldFollowers, includeSelfInOldConf); - final long majority = Math.min(majorityInNewConf, majorityInOldConf); - entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, - Math.max(majority, oldLastCommitted) + 1); + final long[] indicesInOldConf = computeCommittedIndices(oldFollowers, includeSelfInOldConf); + final long majorityInOldConf = getMajority(indicesInOldConf); + majority = Math.min(majorityInNewConf, majorityInOldConf); + min = Math.min(indicesInNewConf[0], indicesInOldConf[0]); + } + + final long oldLastCommitted = raftLog.getLastCommittedIndex(); + if (majority > oldLastCommitted) { + // copy the entries out from the raftlog, in order to prevent that + // the log gets purged after the statemachine does a snapshot + final TermIndex[] entriesToCommit = raftLog.getEntries( + oldLastCommitted + 1, majority + 1); server.getState().updateStatemachine(majority, currentTerm); + checkAndUpdateConfiguration(entriesToCommit); } - checkAndUpdateConfiguration(entriesToCommit); + + pendingRequests.checkDelayedReplies(min); } private boolean committedConf(TermIndex[] entries) { @@ -529,8 +537,11 @@ public class LeaderState { notifySenders(); } - private long computeLastCommitted(List<FollowerInfo> followers, - boolean includeSelf) { + static long getMajority(long[] indices) { + return indices[(indices.length - 1) / 2]; + } + + private long[] computeCommittedIndices(List<FollowerInfo> followers, boolean includeSelf) { final int length = includeSelf ? followers.size() + 1 : followers.size(); if (length == 0) { throw new IllegalArgumentException("followers.size() == " @@ -546,7 +557,7 @@ public class LeaderState { } Arrays.sort(indices); - return indices[(indices.length - 1) / 2]; + return indices; } private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) { @@ -567,7 +578,9 @@ public class LeaderState { } void replyPendingRequest(long logIndex, RaftClientReply reply) { - pendingRequests.replyPendingRequest(logIndex, reply); + if (!pendingRequests.replyPendingRequest(logIndex, reply)) { + submitUpdateStateEvent(UPDATE_COMMIT_EVENT); + } } TransactionContext getTransactionContext(long index) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java index b63cd01..10cd95f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java @@ -17,10 +17,14 @@ */ package org.apache.ratis.server.impl; -import org.apache.ratis.protocol.*; +import org.apache.ratis.protocol.NotLeaderException; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.Preconditions; +import java.util.Objects; import java.util.concurrent.CompletableFuture; public class PendingRequest implements Comparable<PendingRequest> { @@ -29,6 +33,8 @@ public class PendingRequest implements Comparable<PendingRequest> { private final TransactionContext entry; private final CompletableFuture<RaftClientReply> future; + private volatile RaftClientReply delayed; + PendingRequest(long index, RaftClientRequest request, TransactionContext entry) { this.index = index; @@ -70,6 +76,16 @@ public class PendingRequest implements Comparable<PendingRequest> { future.complete(r); } + synchronized void setDelayedReply(RaftClientReply r) { + Objects.requireNonNull(r); + Preconditions.assertTrue(delayed == null); + delayed = r; + } + + synchronized void completeDelayedReply() { + setReply(delayed); + } + TransactionContext setNotLeaderException(NotLeaderException nle) { setReply(new RaftClientReply(getRequest(), nle, null)); return getEntry(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index b418658..c02d7c3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -18,16 +18,20 @@ package org.apache.ratis.server.impl; import org.apache.ratis.protocol.*; +import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; +import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; class PendingRequests { @@ -71,14 +75,63 @@ class PendingRequests { } } + private static class DelayedReplies { + private final String name; + private final PriorityQueue<PendingRequest> q = new PriorityQueue<>(); + private AtomicLong allAckedIndex = new AtomicLong(); + + private DelayedReplies(Object name) { + this.name = name + "-" + getClass().getSimpleName(); + } + + boolean delay(PendingRequest request, RaftClientReply reply) { + if (request.getIndex() <= allAckedIndex.get()) { + return false; // delay is not required. + } + + LOG.debug("{}: delay request {}", name, request); + request.setDelayedReply(reply); + final boolean offered; + synchronized (q) { + offered = q.offer(request); + } + Preconditions.assertTrue(offered); + return true; + } + + void update(final long allAcked) { + final long old = allAckedIndex.getAndUpdate(n -> allAcked > n? allAcked : n); + if (allAcked <= old) { + return; + } + + LOG.debug("{}: update allAckedIndex {} -> {}", name, old, allAcked); + for(;;) { + final PendingRequest polled; + synchronized (q) { + final PendingRequest peeked = q.peek(); + if (peeked == null || peeked.getIndex() > allAcked) { + return; + } + polled = q.poll(); + Preconditions.assertTrue(polled == peeked); + } + polled.completeDelayedReply(); + } + } + } + private PendingRequest pendingSetConf; private final RaftServerImpl server; private final RequestMap pendingRequests; private PendingRequest last = null; + private final DelayedReplies delayedReplies; + PendingRequests(RaftServerImpl server) { this.server = server; this.pendingRequests = new RequestMap(server.getId()); + this.delayedReplies = new DelayedReplies(server.getId()); } PendingRequest addPendingRequest(long index, RaftClientRequest request, @@ -132,12 +185,20 @@ class PendingRequests { return pendingRequest != null ? pendingRequest.getEntry() : null; } - void replyPendingRequest(long index, RaftClientReply reply) { + boolean replyPendingRequest(long index, RaftClientReply reply) { final PendingRequest pending = pendingRequests.remove(index); if (pending != null) { Preconditions.assertTrue(pending.getIndex() == index); + + final ReplicationLevel replication = pending.getRequest().getType().getWrite().getReplication(); + if (replication == ReplicationLevel.ALL) { + if (delayedReplies.delay(pending, reply)) { + return false; + } + } pending.setReply(reply); } + return true; } /** @@ -155,4 +216,8 @@ class PendingRequests { pendingSetConf.setNotLeaderException(nle); } } + + void checkDelayedReplies(long allAckedIndex) { + delayedReplies.update(allAckedIndex); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 91a1600..74238e8 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -28,6 +28,7 @@ import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.storage.MemoryRaftLog; import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.*; @@ -450,6 +451,9 @@ public abstract class MiniRaftCluster { public static Stream<RaftServerImpl> getServerStream(Collection<RaftServerProxy> servers) { return servers.stream().map(RaftTestUtil::getImplAsUnchecked); } + public Stream<RaftServerImpl> getServerStream() { + return getServerStream(getServers()); + } public Stream<RaftServerImpl> getServerAliveStream() { return getServerStream(getServers()).filter(RaftServerImpl::isAlive); } @@ -504,7 +508,7 @@ public abstract class MiniRaftCluster { public RaftClientRequest newRaftClientRequest( ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message) { return new RaftClientRequest(clientId, leaderId, getGroupId(), - callId, seqNum, message, RaftClientRequest.writeRequestType()); + callId, seqNum, message, RaftClientRequest.writeRequestType(ReplicationLevel.MAJORITY)); } public SetConfigurationRequest newSetConfigurationRequest( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index 3c68469..438d56a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -30,12 +30,15 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.TimeDuration; -import org.junit.*; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; import java.io.IOException; import java.util.ArrayList; @@ -157,7 +160,18 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties); cluster.start(); waitForLeader(cluster); - RaftBasicTests.runTestBasicAppendEntries(true, 1000, cluster, LOG); + RaftBasicTests.runTestBasicAppendEntries(true, ReplicationLevel.MAJORITY, 1000, cluster, LOG); + cluster.shutdown(); + } + + @Test + public void testBasicAppendEntriesAsyncWithAllReplication() throws Exception { + LOG.info("Running testBasicAppendEntriesAsync"); + RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100); + final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties); + cluster.start(); + waitForLeader(cluster); + RaftBasicTests.runTestBasicAppendEntries(true, ReplicationLevel.ALL, 1000, cluster, LOG); cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index b0980f4..4deeef5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -24,12 +24,14 @@ import org.apache.ratis.client.impl.RaftClientTestUtil; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RetryCacheTestUtil; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.TimeDuration; @@ -90,16 +92,39 @@ public abstract class RaftBasicTests extends BaseTest { @Test public void testBasicAppendEntries() throws Exception { - runTestBasicAppendEntries(false, 10, getCluster(), LOG); + runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, 10, getCluster(), LOG); + } + + @Test + public void testBasicAppendEntriesWithAllReplication() throws Exception { + runTestBasicAppendEntries(false, ReplicationLevel.ALL, 10, getCluster(), LOG); } static void runTestBasicAppendEntries( - boolean async, int numMessages, MiniRaftCluster cluster, Logger LOG) throws Exception { + boolean async, ReplicationLevel replication, int numMessages, MiniRaftCluster cluster, Logger LOG) throws Exception { LOG.info("runTestBasicAppendEntries: async? " + async + ", numMessages=" + numMessages); + for (RaftServer s : cluster.getServers()) { + cluster.restartServer(s.getId(), false); + } RaftServerImpl leader = waitForLeader(cluster); final long term = leader.getState().getCurrentTerm(); + final RaftPeerId killed = cluster.getFollowers().get(0).getId(); cluster.killServer(killed); + + if (replication == ReplicationLevel.ALL) { + new Thread(() -> { + try { + Thread.sleep(3000); + LOG.info("restart server: " + killed.toString()); + cluster.restartServer(killed, false); + } catch (Exception e) { + LOG.info("cannot restart server: " + killed.toString()); + e.printStackTrace(); + } + }).start(); + } + LOG.info(cluster.printServers()); final SimpleMessage[] messages = SimpleMessage.create(numMessages); @@ -110,7 +135,7 @@ public abstract class RaftBasicTests extends BaseTest { for (SimpleMessage message : messages) { if (async) { - client.sendAsync(message).thenAcceptAsync(reply -> { + client.sendAsync(message, replication).thenAcceptAsync(reply -> { if (!reply.isSuccess()) { f.completeExceptionally( new AssertionError("Failed with reply " + reply)); @@ -119,7 +144,7 @@ public abstract class RaftBasicTests extends BaseTest { } }); } else { - client.send(message); + client.send(message, replication); } } if (async) { @@ -127,14 +152,16 @@ public abstract class RaftBasicTests extends BaseTest { Assert.assertEquals(messages.length, asyncReplyCount.get()); } } - - Thread.sleep(cluster.getMaxTimeout() + 100); + if (replication != ReplicationLevel.ALL) { + Thread.sleep(cluster.getMaxTimeout() + 100); + } LOG.info(cluster.printAllLogs()); cluster.getServerAliveStream().map(s -> s.getState().getLog()) .forEach(log -> RaftTestUtil.assertLogEntries(log, async, term, messages)); } + @Test public void testOldLeaderCommit() throws Exception { LOG.info("Running testOldLeaderCommit");