Repository: incubator-ratis Updated Branches: refs/heads/master f7f97e050 -> 7872f3296
RATIS-140. Raft client should reuse the gRPC stream for all async calls. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/7872f329 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/7872f329 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/7872f329 Branch: refs/heads/master Commit: 7872f32962b5fdb5229be6bfc36eb73ec01ff79f Parents: f7f97e0 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Thu Dec 28 14:44:04 2017 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Thu Dec 28 14:44:04 2017 +0800 ---------------------------------------------------------------------- .../ratis/client/RaftClientConfigKeys.java | 2 +- .../ratis/client/impl/ClientProtoUtils.java | 6 +- .../ratis/client/impl/RaftClientImpl.java | 96 +++-- .../org/apache/ratis/util/SlidingWindow.java | 403 +++++++++++++++++++ .../apache/ratis/grpc/client/GrpcClientRpc.java | 9 +- .../grpc/client/RaftClientProtocolClient.java | 96 ++++- .../grpc/client/RaftClientProtocolService.java | 137 +++---- .../java/org/apache/ratis/RaftAsyncTests.java | 3 +- .../java/org/apache/ratis/RaftTestUtil.java | 5 - 9 files changed, 648 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java index 03f12cb..bb76910 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java @@ -59,7 +59,7 @@ public interface RaftClientConfigKeys { static int schedulerThreads(RaftProperties properties) { return getInt(properties::getInt, SCHEDULER_THREADS_KEY, - SCHEDULER_THREADS_DEFAULT); + SCHEDULER_THREADS_DEFAULT, requireMin(1)); } static void setSchedulerThreads(RaftProperties properties, int schedulerThreads) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 97439ac..a7aaf54 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -209,10 +209,14 @@ public interface ClientProtoUtils { } static Message toMessage(final ClientMessageEntryProto p) { + return toMessage(p.getContent()); + } + + static Message toMessage(final ByteString bytes) { return new Message() { @Override public ByteString getContent() { - return p.getContent(); + return bytes; } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index ba1a107..6ee415d 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -21,18 +21,18 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.util.IOUtils; -import org.apache.ratis.util.CollectionUtils; -import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.TimeDuration; import org.apache.ratis.protocol.*; +import org.apache.ratis.util.*; import java.io.IOException; import java.io.InterruptedIOException; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.function.LongFunction; import java.util.function.Supplier; import java.util.stream.Stream; @@ -44,6 +44,45 @@ final class RaftClientImpl implements RaftClient { return callIdCounter.getAndIncrement() & Long.MAX_VALUE; } + static class PendingAsyncRequest implements SlidingWindow.Request<RaftClientReply> { + private final long seqNum; + private final LongFunction<RaftClientRequest> requestConstructor; + private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>(); + + PendingAsyncRequest(long seqNum, LongFunction<RaftClientRequest> requestConstructor) { + this.seqNum = seqNum; + this.requestConstructor = requestConstructor; + } + + RaftClientRequest newRequest() { + return requestConstructor.apply(seqNum); + } + + @Override + public long getSeqNum() { + return seqNum; + } + + @Override + public boolean hasReply() { + return replyFuture.isDone(); + } + + @Override + public void setReply(RaftClientReply reply) { + replyFuture.complete(reply); + } + + CompletableFuture<RaftClientReply> getReplyFuture() { + return replyFuture; + } + + @Override + public String toString() { + return "[seq=" + getSeqNum() + "]"; + } + } + private final ClientId clientId; private final RaftClientRpc clientRpc; private final Collection<RaftPeer> peers; @@ -52,7 +91,7 @@ final class RaftClientImpl implements RaftClient { private volatile RaftPeerId leaderId; - private final AtomicLong asyncSeqNum = new AtomicLong(); + private final SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> slidingWindow; private final ScheduledExecutorService scheduler; private final Semaphore asyncRequestSemaphore; @@ -68,13 +107,10 @@ final class RaftClientImpl implements RaftClient { asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties)); scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties)); + slidingWindow = new SlidingWindow.Client<>(getId()); clientRpc.addServers(peers); } - private long nextSeqNum() { - return asyncSeqNum.getAndIncrement() & Long.MAX_VALUE; - } - @Override public ClientId getId() { return clientId; @@ -100,9 +136,10 @@ final class RaftClientImpl implements RaftClient { "Interrupted when sending " + message, e)); } final long callId = nextCallId(); - final long seqNum = nextSeqNum(); - return sendRequestWithRetryAsync( - () -> new RaftClientRequest(clientId, leaderId, groupId, callId, seqNum, message, readOnly) + final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum, + seq -> new RaftClientRequest(clientId, leaderId, groupId, callId, seq, message, readOnly)); + return slidingWindow.submitNewRequest(constructor, this::sendRequestWithRetryAsync + ).getReplyFuture( ).thenApply(reply -> handleStateMachineException(reply, CompletionException::new) ).whenComplete((r, e) -> asyncRequestSemaphore.release()); } @@ -164,13 +201,14 @@ final class RaftClientImpl implements RaftClient { } private CompletableFuture<RaftClientReply> sendRequestWithRetryAsync( - Supplier<RaftClientRequest> supplier) { - return sendRequestAsync(supplier.get()).thenComposeAsync(reply -> { - final CompletableFuture<RaftClientReply> f = new CompletableFuture<>(); + PendingAsyncRequest pending) { + final RaftClientRequest request = pending.newRequest(); + final CompletableFuture<RaftClientReply> f = pending.getReplyFuture(); + return sendRequestAsync(request).thenCompose(reply -> { if (reply == null) { final TimeUnit unit = retryInterval.getUnit(); - scheduler.schedule(() -> sendRequestWithRetryAsync(supplier) - .thenApply(r -> f.complete(r)), retryInterval.toLong(unit), unit); + scheduler.schedule(() -> slidingWindow.retry(pending, this::sendRequestWithRetryAsync), + retryInterval.toLong(unit), unit); } else { f.complete(reply); } @@ -204,14 +242,23 @@ final class RaftClientImpl implements RaftClient { LOG.debug("{}: send* {}", clientId, request); return clientRpc.sendRequestAsync(request).thenApply(reply -> { LOG.debug("{}: receive* {}", clientId, reply); - return handleNotLeaderException(request, reply); + reply = handleNotLeaderException(request, reply); + if (reply != null) { + slidingWindow.receiveReply( + request.getSeqNum(), reply, this::sendRequestWithRetryAsync); + } + return reply; }).exceptionally(e -> { LOG.debug("{}: Failed {} with {}", clientId, request, e); - final Throwable cause = e.getCause(); - if (cause instanceof GroupMismatchException) { - return new RaftClientReply(request, (RaftException) cause); - } else if (cause instanceof IOException) { - handleIOException(request, (IOException) cause, null); + if (e instanceof CompletionException) { + e = e.getCause(); + } + if (e instanceof GroupMismatchException) { + throw new CompletionException(e); + } else if (e instanceof IOException) { + handleIOException(request, (IOException)e, null); + } else { + throw new CompletionException(e); } return null; }); @@ -281,6 +328,7 @@ final class RaftClientImpl implements RaftClient { LOG.trace("Stack trace", new Throwable("TRACE")); } + slidingWindow.resetFirstSeqNum(); if (ioe instanceof LeaderNotReadyException) { return; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java new file mode 100644 index 0000000..6ded6f7 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java @@ -0,0 +1,403 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.LongFunction; + +/** + * A single-client-to-multiple-server sliding window. + * The client only talks to a server at any time. + * When the current server fails, the client fails over to another server. + */ +public interface SlidingWindow { + Logger LOG = LoggerFactory.getLogger(SlidingWindow.class); + + interface Request<REPLY> { + long getSeqNum(); + + void setReply(REPLY reply); + + boolean hasReply(); + } + + /** A seqNum-to-request map, sorted by seqNum. */ + class RequestMap<REQUEST extends Request<REPLY>, REPLY> implements Iterable<REQUEST> { + private final Object name; + /** Request map: seqNum -> request */ + private final SortedMap<Long, REQUEST> requests = new TreeMap<>(); + + RequestMap(Object name) { + this.name = name; + if (LOG.isDebugEnabled()) { + JavaUtils.runRepeatedly(() -> log(), 5, 10, TimeUnit.SECONDS); + } + } + + Object getName() { + return name; + } + + boolean isEmpty() { + return requests.isEmpty(); + } + + /** + * If the request with the given seqNum is non-replied, return it. + * Otherwise, return null. + * + * A request is non-replied if + * (1) it is in the request map, and + * (2) it does not has reply. + */ + REQUEST getNonRepliedRequest(long seqNum, String op) { + final REQUEST request = requests.get(seqNum); + if (request == null) { + LOG.debug("{}: {}, seq={} not found in {}", getName(), op, seqNum, this); + return null; + } + if (request.hasReply()) { + LOG.debug("{}: {}, seq={} already has replied in {}", getName(), op, seqNum, this); + return null; + } + return request; + } + + long firstSeqNum() { + return requests.firstKey(); + } + + long lastSeqNum() { + return requests.lastKey(); + } + + /** Iterate the requests in the order of seqNum. */ + @Override + public Iterator<REQUEST> iterator() { + return requests.values().iterator(); + } + + void putNewRequest(REQUEST request) { + final long seqNum = request.getSeqNum(); + CollectionUtils.putNew(seqNum, request, requests, () -> getName() + ":requests"); + } + + /** + * Set reply for the request with the given seqNum if it is non-replied. + * Otherwise, do nothing. + * + * @return true iff this method does set the reply for the request. + */ + boolean setReply(long seqNum, REPLY reply, String op) { + final REQUEST request = getNonRepliedRequest(seqNum, op); + if (request == null) { + LOG.debug("{}: DUPLICATED reply {} for seq={} in {}", getName(), reply, seqNum, this); + return false; + } + + LOG.debug("{}: set reply {} for seq={} in {}", getName(), reply, seqNum, this); + request.setReply(reply); + return true; + } + + synchronized void clear() { + LOG.debug("close {}", this); + requests.clear(); + } + + synchronized void log() { + LOG.debug(this.toString()); + for(REQUEST r : requests.values()) { + LOG.debug(" {}: hasReply? {}", r.getSeqNum(), r.hasReply()); + } + } + + @Override + public String toString() { + return getName() + ": requests" + asString(requests); + } + + private static String asString(SortedMap<Long, ?> map) { + return map.isEmpty()? "[]": "[" + map.firstKey() + ".." + map.lastKey() + "]"; + } + } + + /** + * Client side sliding window. + * A client may + * (1) allocate seqNum for new requests; + * (2) send requests/retries to the server; + * (3) receive replies/exceptions from the server; + * (4) return the replies/exceptions to client. + * + * Depend on the replies/exceptions, the client may retry the requests + * to the same or a different server. + */ + class Client<REQUEST extends Request<REPLY>, REPLY> { + /** The requests in the sliding window. */ + private final RequestMap<REQUEST, REPLY> requests; + /** Delayed requests. */ + private final SortedMap<Long, Long> delayedRequests = new TreeMap<>(); + + /** The seqNum for the next new request. */ + private long nextSeqNum = 0; + /** The seqNum of the first request. */ + private long firstSeqNum = -1; + /** Is the first request replied? */ + private boolean firstReplied; + + public Client(Object name) { + this.requests = new RequestMap<REQUEST, REPLY>(name) { + @Override + synchronized void log() { + LOG.debug(toString()); + for (REQUEST r : requests) { + LOG.debug(" {}: {}", r.getSeqNum(), r.hasReply() ? "replied" + : delayedRequests.containsKey(r.getSeqNum()) ? "delayed" : "submitted"); + } + } + }; + } + + @Override + public synchronized String toString() { + return requests + ", nextSeqNum=" + nextSeqNum + + ", firstSubmitted=" + firstSeqNum + ", replied? " + firstReplied + + ", delayed=" + delayedRequests.keySet(); + } + + /** + * A new request arrives, create it with {@link #nextSeqNum} + * and then try sending it to the server. + * + * @param requestConstructor use seqNum to create a new request. + * @return the new request. + */ + public synchronized REQUEST submitNewRequest( + LongFunction<REQUEST> requestConstructor, Consumer<REQUEST> sendMethod) { + if (!requests.isEmpty()) { + Preconditions.assertTrue(nextSeqNum == requests.lastSeqNum() + 1, + () -> "nextSeqNum=" + nextSeqNum + " but " + this); + } + + final long seqNum = nextSeqNum++; + final REQUEST r = requestConstructor.apply(seqNum); + requests.putNewRequest(r); + + final boolean submitted = sendOrDelayRequest(r, sendMethod); + LOG.debug("{}: submitting a new request {} in {}? {}", + requests.getName(), r, this, submitted? "submitted": "delayed"); + return r; + } + + private boolean sendOrDelayRequest(REQUEST request, Consumer<REQUEST> sendMethod) { + final long seqNum = request.getSeqNum(); + Preconditions.assertTrue(requests.getNonRepliedRequest(seqNum, "sendOrDelayRequest") == request); + + if (firstReplied) { + // already received the reply for the first request, submit any request. + sendMethod.accept(request); + return true; + } + + if (firstSeqNum == -1 && seqNum == requests.firstSeqNum()) { + // first request is not yet submitted and this is the first request, submit it. + LOG.debug("{}: detect firstSubmitted {} in {}", requests.getName(), request, this); + firstSeqNum = seqNum; + sendMethod.accept(request); + return true; + } + + // delay other requests + CollectionUtils.putNew(seqNum, seqNum, delayedRequests, () -> requests.getName() + ":delayedRequests"); + return false; + } + + /** Receive a retry from an existing request (may out-of-order). */ + public synchronized void retry(REQUEST request, Consumer<REQUEST> sendMethod) { + if (requests.getNonRepliedRequest(request.getSeqNum(), "retry") != request) { + // out-dated or invalid retry + LOG.debug("{}: Ignore retry {} in {}", requests.getName(), request, this); + return; + } + final boolean submitted = sendOrDelayRequest(request, sendMethod); + LOG.debug("{}: submitting a retry {} in {}? {}", + requests.getName(), request, this, submitted? "submitted": "delayed"); + } + + private void removeRepliedFromHead() { + for (final Iterator<REQUEST> i = requests.iterator(); i.hasNext(); i.remove()) { + final REQUEST r = i.next(); + if (!r.hasReply()) { + return; + } + } + } + + /** + * Receive a reply with the given seqNum (may out-of-order). + * It may trigger the client to send delayed requests. + */ + public synchronized void receiveReply( + long seqNum, REPLY reply, Consumer<REQUEST> sendMethod) { + if (!requests.setReply(seqNum, reply, "receiveReply")) { + return; // request already replied + } + if (seqNum == firstSeqNum) { + firstReplied = true; // received the reply for the first submitted request + } + removeRepliedFromHead(); + trySendDelayed(sendMethod); + } + + private void trySendDelayed(Consumer<REQUEST> sendMethod) { + if (firstReplied) { + // after first received, all other requests can be submitted (out-of-order) + if (!delayedRequests.isEmpty()) { + for (Long seqNum : delayedRequests.keySet()) { + sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed")); + } + delayedRequests.clear(); + } + } else { + // Otherwise, submit the first only if it is a delayed request + final Iterator<REQUEST> i = requests.iterator(); + if (i.hasNext()) { + final REQUEST r = i.next(); + final Long delayed = delayedRequests.remove(r.getSeqNum()); + if (delayed != null) { + sendOrDelayRequest(r, sendMethod); + } + } + } + } + + /** Reset the {@link #firstSeqNum} The stream has an error. */ + public synchronized void resetFirstSeqNum() { + firstSeqNum = -1; + firstReplied = false; + LOG.debug("After resetFirstSeqNum: {}", this); + } + } + + /** + * Server side sliding window. + * A server may + * (1) receive requests from client; + * (2) submit the requests for processing; + * (3) receive replies from the processing unit; + * (4) send replies to the client. + */ + class Server<REQUEST extends Request<REPLY>, REPLY> implements Closeable { + /** The requests in the sliding window. */ + private final RequestMap<REQUEST, REPLY> requests; + /** The end of requests */ + private final REQUEST end; + + private long nextToProcess = -1; + + public Server(Object name, REQUEST end) { + this.requests = new RequestMap<>(name); + this.end = end; + Preconditions.assertTrue(end.getSeqNum() == Long.MAX_VALUE); + } + + @Override + public synchronized String toString() { + return requests + ", nextToProcess=" + nextToProcess; + } + + /** A request (or a retry) arrives (may be out-of-order except for the first request). */ + public synchronized void receivedRequest(REQUEST request, Consumer<REQUEST> processingMethod) { + final long seqNum = request.getSeqNum(); + if (nextToProcess == -1) { + nextToProcess = seqNum; + LOG.debug("{}: got seq={} (first request), set nextToProcess in {}", requests.getName(), seqNum, this); + } else { + LOG.debug("{}: got seq={} in {}", requests.getName(), seqNum, this); + } + requests.putNewRequest(request); + processRequestsFromHead(processingMethod); + } + + private void processRequestsFromHead(Consumer<REQUEST> processingMethod) { + for(REQUEST r : requests) { + if (r.getSeqNum() != nextToProcess) { + return; + } + processingMethod.accept(r); + nextToProcess++; + } + } + + /** + * Receives a reply for the given seqNum (may out-of-order) from the processor. + * It may trigger sending replies to client or processing more requests. + */ + public synchronized void receiveReply( + long seqNum, REPLY reply, Consumer<REQUEST> replyMethod, Consumer<REQUEST> processingMethod) { + if (!requests.setReply(seqNum, reply, "receiveReply")) { + return; // request already replied + } + sendRepliesFromHead(replyMethod); + processRequestsFromHead(processingMethod); + } + + private void sendRepliesFromHead( + Consumer<REQUEST> replyMethod + ) { + for(final Iterator<REQUEST> i = requests.iterator(); i.hasNext(); i.remove()) { + final REQUEST r = i.next(); + if (!r.hasReply()) { + return; + } + replyMethod.accept(r); + if (r == end) { + return; + } + } + } + + /** + * Signal the end of requests. + * @return true if no more outstanding requests. + */ + public synchronized boolean endOfRequests() { + if (requests.isEmpty()) { + return true; + } else { + LOG.debug("{}: put end-of-request in {}", requests.getName(), this); + requests.putNewRequest(end); + return false; + } + } + + @Override + public void close() { + requests.clear(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index c5c188e..ea1f204 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -58,7 +58,9 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie RaftClientRequest request) { final RaftPeerId serverId = request.getServerId(); try { - return sendRequestAsync(request, getProxies().getProxy(serverId)); + final RaftClientProtocolClient proxy = getProxies().getProxy(serverId); + // Reuse the same grpc stream for all async calls. + return proxy.getAppendStreamObservers().onNext(request); } catch (IOException e) { return JavaUtils.completeExceptionally(e); } @@ -83,7 +85,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie return ClientProtoUtils.toServerInformationReply( proxy.serverInformation(proto)); } else { - final CompletableFuture<RaftClientReply> f = sendRequestAsync(request, proxy); + final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy); // TODO: timeout support try { return f.get(); @@ -96,12 +98,13 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie } } - private CompletableFuture<RaftClientReply> sendRequestAsync( + private CompletableFuture<RaftClientReply> sendRequest( RaftClientRequest request, RaftClientProtocolClient proxy) throws IOException { final RaftClientRequestProto requestProto = toRaftClientRequestProto(request); final CompletableFuture<RaftClientReplyProto> replyFuture = new CompletableFuture<>(); + // create a new grpc stream for each non-async call. final StreamObserver<RaftClientRequestProto> requestObserver = proxy.append(new StreamObserver<RaftClientReplyProto>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java index ace90f2..0b05475 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java @@ -17,9 +17,9 @@ */ package org.apache.ratis.grpc.client; +import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.io.grpc.ManagedChannel; import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; @@ -31,12 +31,17 @@ import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc; import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub; import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub; import org.apache.ratis.util.CheckedSupplier; +import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.JavaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; public class RaftClientProtocolClient implements Closeable { @@ -49,6 +54,8 @@ public class RaftClientProtocolClient implements Closeable { private final RaftClientProtocolServiceStub asyncStub; private final AdminProtocolServiceBlockingStub adminBlockingStub; + private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>(); + public RaftClientProtocolClient(ClientId id, RaftPeer target) { this.name = JavaUtils.memoize(() -> id + "->" + target.getId()); this.target = target; @@ -65,6 +72,10 @@ public class RaftClientProtocolClient implements Closeable { @Override public void close() { + final AsyncStreamObservers observers = appendStreamObservers.get(); + if (observers != null) { + observers.close(); + } channel.shutdownNow(); } @@ -98,7 +109,88 @@ public class RaftClientProtocolClient implements Closeable { return asyncStub.append(responseHandler); } + AsyncStreamObservers getAppendStreamObservers() { + return appendStreamObservers.updateAndGet(a -> a != null? a : new AsyncStreamObservers()); + } + public RaftPeer getTarget() { return target; } + + class AsyncStreamObservers implements Closeable { + /** Request map: callId -> future */ + private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>()); + private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() { + @Override + public void onNext(RaftClientReplyProto proto) { + final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get(); + if (map == null) { + LOG.warn("replyStreamObserver onNext map == null"); + return; + } + final long callId = proto.getRpcReply().getCallId(); + try { + final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto); + final NotLeaderException nle = reply.getNotLeaderException(); + if (nle != null) { + completeReplyExceptionally(nle, NotLeaderException.class.getName()); + return; + } + map.remove(callId).complete(reply); + } catch (Throwable t) { + map.get(callId).completeExceptionally(t); + } + } + + @Override + public void onError(Throwable t) { + final IOException ioe = RaftGrpcUtil.unwrapIOException(t); + completeReplyExceptionally(ioe, "onError"); + } + + @Override + public void onCompleted() { + completeReplyExceptionally(null, "completed"); + } + }; + private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver); + + CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) { + final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get(); + if (map == null) { + return JavaUtils.completeExceptionally(new IOException("Already closed.")); + } + final CompletableFuture<RaftClientReply> f = new CompletableFuture<>(); + CollectionUtils.putNew(request.getCallId(), f, map, + () -> getName() + ":" + getClass().getSimpleName()); + try { + requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request)); + } catch(Throwable t) { + f.completeExceptionally(t); + } + return f; + } + + @Override + public void close() { + requestStreamObserver.onCompleted(); + completeReplyExceptionally(null, "close"); + } + + private void completeReplyExceptionally(Throwable t, String event) { + appendStreamObservers.compareAndSet(this, null); + final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSet(null); + if (map == null) { + return; + } + for (Map.Entry<Long, CompletableFuture<RaftClientReply>> entry : map.entrySet()) { + final CompletableFuture<RaftClientReply> f = entry.getValue(); + if (!f.isDone()) { + f.completeExceptionally(t != null? t + : new IOException(getName() + ": Stream " + event + + ": no reply for async request cid=" + entry.getKey())); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java index f3ebe0f..6d19920 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java @@ -25,17 +25,21 @@ import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.SlidingWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.io.IOException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase { - static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class); + public static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class); - private static class PendingAppend implements Comparable<PendingAppend> { + private static class PendingAppend implements SlidingWindow.Request<RaftClientReply> { private final RaftClientRequest request; private volatile RaftClientReply reply; @@ -43,25 +47,27 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase this.request = request; } - boolean isReady() { + @Override + public boolean hasReply() { return reply != null || this == COMPLETED; } - void setReply(RaftClientReply reply) { + @Override + public void setReply(RaftClientReply reply) { this.reply = reply; } - RaftClientRequest getRequest() { - return request; + RaftClientReply getReply() { + return reply; } - long getSeqNum() { - return request != null? request.getSeqNum(): Long.MAX_VALUE; + RaftClientRequest getRequest() { + return request; } @Override - public int compareTo(PendingAppend that) { - return Long.compare(this.getSeqNum(), that.getSeqNum()); + public long getSeqNum() { + return request != null? request.getSeqNum(): Long.MAX_VALUE; } @Override @@ -97,97 +103,84 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase return new AppendRequestStreamObserver(responseObserver); } + private final AtomicInteger streamCount = new AtomicInteger(); + private class AppendRequestStreamObserver implements StreamObserver<RaftClientRequestProto> { - private final List<PendingAppend> pendingList = new LinkedList<>(); + private final String name = getId() + "-" + streamCount.getAndIncrement(); private final StreamObserver<RaftClientReplyProto> responseObserver; + private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow + = new SlidingWindow.Server<>(name, COMPLETED); AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) { + LOG.debug("new AppendRequestStreamObserver {}", name); this.responseObserver = ro; } + void processClientRequestAsync(PendingAppend pending) { + try { + protocol.submitClientRequestAsync(pending.getRequest() + ).thenAcceptAsync(reply -> slidingWindow.receiveReply( + pending.getSeqNum(), reply, this::sendReply, this::processClientRequestAsync) + ).exceptionally(exception -> { + // TODO: the exception may be from either raft or state machine. + // Currently we skip all the following responses when getting an + // exception from the state machine. + responseError(exception, () -> "processClientRequestAsync for " + pending.getRequest()); + return null; + }); + } catch (IOException e) { + throw new CompletionException("Failed processClientRequestAsync for " + pending.getRequest(), e); + } + } + @Override public void onNext(RaftClientRequestProto request) { try { final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request); final PendingAppend p = new PendingAppend(r); - final long replySeq = p.getSeqNum(); - synchronized (pendingList) { - pendingList.add(p); - } - - protocol.submitClientRequestAsync(r - ).whenCompleteAsync((reply, exception) -> { - if (exception != null) { - // TODO: the exception may be from either raft or state machine. - // Currently we skip all the following responses when getting an - // exception from the state machine. - responseObserver.onError(RaftGrpcUtil.wrapException(exception)); - } else { - synchronized (pendingList) { - Preconditions.assertTrue(!pendingList.isEmpty(), - "PendingList is empty when handling onNext for seqNum %s", replySeq); - final long headSeqNum = pendingList.get(0).getSeqNum(); - // stream seqNum is consecutive - final PendingAppend pendingForReply = pendingList.get( - (int) (replySeq - headSeqNum)); - Preconditions.assertTrue(pendingForReply != null && - pendingForReply.getSeqNum() == replySeq, - "pending for reply is: %s, the pending list: %s", - pendingForReply, pendingList); - pendingForReply.setReply(reply); - - if (headSeqNum == replySeq) { - Collection<PendingAppend> readySet = new ArrayList<>(); - // if this is head, we send back all the ready responses - Iterator<PendingAppend> iter = pendingList.iterator(); - PendingAppend pending; - while (iter.hasNext() && ((pending = iter.next()).isReady())) { - readySet.add(pending); - iter.remove(); - } - sendReadyReplies(readySet); - } - } - } - }); + slidingWindow.receivedRequest(p, this::processClientRequestAsync); } catch (Throwable e) { - LOG.info("{} got exception when handling client append request {}: {}", - getId(), request.getRpcRequest(), e); - responseObserver.onError(RaftGrpcUtil.wrapException(e)); + responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request)); } } - private void sendReadyReplies(Collection<PendingAppend> readySet) { - readySet.forEach(ready -> { - Preconditions.assertTrue(ready.isReady()); + private void sendReply(PendingAppend ready) { + Preconditions.assertTrue(ready.hasReply()); if (ready == COMPLETED) { - responseObserver.onCompleted(); + close(); } else { + LOG.debug("{}: sendReply seq={}, {}", name, ready.getSeqNum(), ready.getReply()); responseObserver.onNext( - ClientProtoUtils.toRaftClientReplyProto(ready.reply)); + ClientProtoUtils.toRaftClientReplyProto(ready.getReply())); } - }); } @Override public void onError(Throwable t) { // for now we just log a msg - LOG.warn("{} onError: client Append cancelled", getId(), t); - synchronized (pendingList) { - pendingList.clear(); - } + LOG.warn(name + ": onError", t); + slidingWindow.close(); } @Override public void onCompleted() { - synchronized (pendingList) { - if (pendingList.isEmpty()) { - responseObserver.onCompleted(); - } else { - pendingList.add(COMPLETED); - } + if (slidingWindow.endOfRequests()) { + close(); } } + + private void close() { + LOG.debug("{}: close", name); + responseObserver.onCompleted(); + slidingWindow.close(); + } + + void responseError(Throwable t, Supplier<String> message) { + t = JavaUtils.unwrapCompletionException(t); + LOG.warn(name + ": Failed " + message.get(), t); + responseObserver.onError(RaftGrpcUtil.wrapException(t)); + slidingWindow.close(); + } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index b8bc636..e5f41b7 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -140,10 +140,11 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba @Test public void testBasicAppendEntriesAsync() throws Exception { LOG.info("Running testBasicAppendEntriesAsync"); + RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100); final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties); cluster.start(); waitForLeader(cluster); - RaftBasicTests.runTestBasicAppendEntries(true, 10, cluster, LOG); + RaftBasicTests.runTestBasicAppendEntries(true, 1000, cluster, LOG); cluster.shutdown(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index c8dfc0d..c55445a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -189,11 +189,6 @@ public interface RaftTestUtil { } } - if (async) { - Collections.sort(entries, Comparator - .comparing(e -> e.getSmLogEntry().getData().toStringUtf8())); - } - long logIndex = 0; Assert.assertEquals(expectedMessages.length, entries.size()); for (int i = 0; i < expectedMessages.length; i++) {
