This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 8a7fac9 RATIS-571. Client may send first request in sliding window
with firstFlag as false. Contributed by Lokesh Jain.
8a7fac9 is described below
commit 8a7fac921405baf3bfff0fc4fe41aadec2c5abeb
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Jun 4 14:20:24 2019 +0530
RATIS-571. Client may send first request in sliding window with firstFlag
as false. Contributed by Lokesh Jain.
---
.../org/apache/ratis/client/RaftClientRpc.java | 11 +++++++-
.../org/apache/ratis/client/impl/OrderedAsync.java | 29 +++++++++++++++++-----
.../apache/ratis/client/impl/RaftClientImpl.java | 28 ++++++++++++++++-----
.../java/org/apache/ratis/util/SlidingWindow.java | 25 +++++++++++--------
.../grpc/client/GrpcClientProtocolClient.java | 13 +++-------
.../apache/ratis/grpc/client/GrpcClientRpc.java | 13 +++-------
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 1 +
7 files changed, 79 insertions(+), 41 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index abdfd41..395dc59 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -47,11 +47,20 @@ public interface RaftClientRpc extends Closeable {
void addServers(Iterable<RaftPeer> servers);
/**
- * Handle the given throwable. For example, try reconnecting.
+ * Handle the given throwable. For example, try reconnecting.
*
* @return true if the given throwable is handled; otherwise, the call is an
no-op, return false.
*/
default boolean handleException(RaftPeerId serverId, Throwable t, boolean
reconnect) {
return false;
}
+
+ /**
+ * Determine if the given throwable should be handled. For example, try
reconnecting.
+ *
+ * @return true if the given throwable should be handled; otherwise, return
false.
+ */
+ default boolean shouldReconnect(Throwable t) {
+ return false;
+ }
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 4b5991c..7cb7813 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftException;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
@@ -57,6 +58,7 @@ class OrderedAsync {
private final Function<SlidingWindowEntry, RaftClientRequest>
requestConstructor;
private final long seqNum;
private volatile boolean isFirst = false;
+ private volatile RaftClientRequest request;
PendingOrderedRequest(long seqNum, Function<SlidingWindowEntry,
RaftClientRequest> requestConstructor) {
this.seqNum = seqNum;
@@ -65,7 +67,12 @@ class OrderedAsync {
@Override
RaftClientRequest newRequestImpl() {
- return requestConstructor.apply(ProtoUtils.toSlidingWindowEntry(seqNum,
isFirst));
+ request =
requestConstructor.apply(ProtoUtils.toSlidingWindowEntry(seqNum, isFirst));
+ return request;
+ }
+
+ RaftClientRequest getRequest() {
+ return request;
}
@Override
@@ -158,13 +165,13 @@ class OrderedAsync {
return;
}
- final RaftClientRequest request = pending.newRequest();
- sendRequest(request, pending.getAttemptCount()).thenAccept(reply -> {
+ sendRequest(pending).thenAccept(reply -> {
if (f.isDone()) {
return;
}
if (reply == null) {
final int attempt = pending.getAttemptCount();
+ RaftClientRequest request = pending.getRequest();
LOG.debug("schedule* attempt #{} with policy {} for {}", attempt,
retryPolicy, request);
client.getScheduler().onTimeout(retryPolicy.getSleepTime(attempt,
request),
() -> getSlidingWindow(request).retry(pending,
this::sendRequestWithRetry),
@@ -175,10 +182,20 @@ class OrderedAsync {
}).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally));
}
- private CompletableFuture<RaftClientReply> sendRequest(RaftClientRequest
request, int attemptCount) {
+ private CompletableFuture<RaftClientReply> sendRequest(PendingOrderedRequest
pending) {
final RetryPolicy retryPolicy = client.getRetryPolicy();
- LOG.debug("{}: send* {}", client.getId(), request);
- return client.getClientRpc().sendRequestAsync(request).thenApply(reply -> {
+ final CompletableFuture<RaftClientReply> f;
+ final RaftClientRequest request;
+ try(AutoCloseableLock readLock = client.readLock()) {
+ if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
+ pending.setFirstRequest();
+ }
+ request = pending.newRequest();
+ LOG.debug("{}: send* {}", client.getId(), request);
+ f = client.getClientRpc().sendRequestAsync(request);
+ }
+ int attemptCount = pending.getAttemptCount();
+ return f.thenApply(reply -> {
LOG.debug("{}: receive* {}", client.getId(), reply);
final RaftException replyException = reply != null?
reply.getException(): null;
reply = client.handleNotLeaderException(request, reply,
this::resetSlidingWindow);
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index a31da14..a280b15 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -26,6 +26,7 @@ import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import org.apache.ratis.protocol.*;
import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
@@ -41,6 +42,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -83,6 +85,7 @@ final class RaftClientImpl implements RaftClient {
private volatile RaftPeerId leaderId;
private final TimeoutScheduler scheduler;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private final Supplier<OrderedAsync> orderedAsync;
@@ -353,20 +356,33 @@ final class RaftClientImpl implements RaftClient {
}
final RaftPeerId oldLeader = request.getServerId();
- final RaftPeerId curLeader = request.getServerId();
+ final RaftPeerId curLeader = leaderId;
final boolean stillLeader = oldLeader.equals(curLeader);
if (newLeader == null && stillLeader) {
newLeader = CollectionUtils.random(oldLeader,
CollectionUtils.as(peers, RaftPeer::getId));
}
- LOG.debug("{}: oldLeader={}, curLeader={}, newLeader{}", clientId,
oldLeader, curLeader, newLeader);
+ LOG.debug("{}: oldLeader={}, curLeader={}, newLeader={}", clientId,
oldLeader, curLeader, newLeader);
final boolean changeLeader = newLeader != null && stillLeader;
- if (changeLeader) {
- LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader,
newLeader);
- this.leaderId = newLeader;
+ final boolean reconnect = changeLeader || clientRpc.shouldReconnect(ioe);
+ if (reconnect) {
+ try(AutoCloseableLock writeLock = writeLock()) {
+ if (changeLeader && oldLeader.equals(leaderId)) {
+ LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader,
newLeader);
+ this.leaderId = newLeader;
+ }
+ clientRpc.handleException(oldLeader, ioe, reconnect);
+ }
}
- clientRpc.handleException(oldLeader, ioe, changeLeader);
+ }
+
+ AutoCloseableLock readLock() {
+ return AutoCloseableLock.acquire(lock.readLock());
+ }
+
+ private AutoCloseableLock writeLock() {
+ return AutoCloseableLock.acquire(lock.writeLock());
}
void assertScheduler(int numThreads) {
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index b572832..7ce83d9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -25,6 +25,7 @@ import java.io.Closeable;
import java.util.Iterator;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongFunction;
@@ -64,7 +65,7 @@ public interface SlidingWindow {
static boolean LOG_REPEATEDLY = false;
private final Object name;
/** Request map: seqNum -> request */
- private final SortedMap<Long, REQUEST> requests = new TreeMap<>();
+ private final SortedMap<Long, REQUEST> requests = new
ConcurrentSkipListMap<>();
RequestMap(Object name) {
this.name = name;
@@ -77,11 +78,11 @@ public interface SlidingWindow {
return name;
}
- synchronized boolean isEmpty() {
+ boolean isEmpty() {
return requests.isEmpty();
}
- private synchronized REQUEST get(long seqNum) {
+ private REQUEST get(long seqNum) {
return requests.get(seqNum);
}
@@ -106,11 +107,11 @@ public interface SlidingWindow {
return request;
}
- synchronized long firstSeqNum() {
+ long firstSeqNum() {
return requests.firstKey();
}
- synchronized long lastSeqNum() {
+ long lastSeqNum() {
return requests.lastKey();
}
@@ -120,7 +121,7 @@ public interface SlidingWindow {
return requests.values().iterator();
}
- synchronized void putNewRequest(REQUEST request) {
+ void putNewRequest(REQUEST request) {
final long seqNum = request.getSeqNum();
CollectionUtils.putNew(seqNum, request, requests, () -> getName() +
":requests");
}
@@ -143,7 +144,7 @@ public interface SlidingWindow {
return true;
}
- synchronized void endOfRequests(long nextToProcess, REQUEST end,
Consumer<REQUEST> replyMethod) {
+ void endOfRequests(long nextToProcess, REQUEST end, Consumer<REQUEST>
replyMethod) {
final REQUEST nextToProcessRequest = requests.get(nextToProcess);
Preconditions.assertNull(nextToProcessRequest,
() -> "nextToProcessRequest = " + nextToProcessRequest + " != null,
nextToProcess = " + nextToProcess);
@@ -161,12 +162,12 @@ public interface SlidingWindow {
putNewRequest(end);
}
- synchronized void clear() {
+ void clear() {
LOG.debug("close {}", this);
requests.clear();
}
- synchronized void log() {
+ void log() {
LOG.debug(this.toString());
for(REQUEST r : requests.values()) {
LOG.debug(" {}: hasReply? {}", r.getSeqNum(), r.hasReply());
@@ -174,7 +175,7 @@ public interface SlidingWindow {
}
@Override
- public synchronized String toString() {
+ public String toString() {
return getName() + ": requests" + asString(requests);
}
@@ -374,6 +375,10 @@ public interface SlidingWindow {
private void alreadyClosed(REQUEST request, Throwable e) {
request.fail(new AlreadyClosedException(requests.getName() + " is
closed.", e));
}
+
+ public boolean isFirst(long seqNum) {
+ return seqNum == (firstSeqNum != -1 ? firstSeqNum :
requests.firstSeqNum());
+ }
}
/**
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 2ed5df0..e6a2c54 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -180,12 +180,12 @@ public class GrpcClientProtocolClient implements
Closeable {
AsyncStreamObservers getOrderedStreamObservers() {
return orderedStreamObservers.updateAndGet(
- a -> a != null? a : new AsyncStreamObservers(orderedStreamObservers,
this::ordered));
+ a -> a != null? a : new AsyncStreamObservers(this::ordered));
}
AsyncStreamObservers getUnorderedAsyncStreamObservers() {
return unorderedStreamObservers.updateAndGet(
- a -> a != null? a : new AsyncStreamObservers(unorderedStreamObservers,
asyncStub::unordered));
+ a -> a != null? a : new AsyncStreamObservers(asyncStub::unordered));
}
public RaftPeer getTarget() {
@@ -275,12 +275,9 @@ public class GrpcClientProtocolClient implements Closeable
{
}
};
private final RequestStreamer requestStreamer;
- private final AtomicReference<AsyncStreamObservers> ref;
- AsyncStreamObservers(AtomicReference<AsyncStreamObservers> ref,
- Function<StreamObserver<RaftClientReplyProto>,
StreamObserver<RaftClientRequestProto>> f) {
+ AsyncStreamObservers(Function<StreamObserver<RaftClientReplyProto>,
StreamObserver<RaftClientRequestProto>> f) {
this.requestStreamer = new RequestStreamer(f.apply(replyStreamObserver));
- this.ref = ref;
}
CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
@@ -290,7 +287,7 @@ public class GrpcClientProtocolClient implements Closeable {
}
try {
if
(!requestStreamer.onNext(ClientProtoUtils.toRaftClientRequestProto(request))) {
- throw new AlreadyClosedException(getName() + ": the stream is
closed.");
+ return JavaUtils.completeExceptionally(new
AlreadyClosedException(getName() + ": the stream is closed."));
}
} catch(Throwable t) {
handleReplyFuture(request.getCallId(), future ->
future.completeExceptionally(t));
@@ -317,8 +314,6 @@ public class GrpcClientProtocolClient implements Closeable {
}
private void completeReplyExceptionally(Throwable t, String event) {
- ref.compareAndSet(this, null);
-
final Map<Long, CompletableFuture<RaftClientReply>> map =
replies.getAndSetNull();
if (map == null) {
return;
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index deaaac0..31b34ee 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -163,18 +163,13 @@ public class GrpcClientRpc extends
RaftClientRpcWithProxy<GrpcClientProtocolClie
}
@Override
- public boolean handleException(RaftPeerId serverId, Throwable e, boolean
reconnect) {
+ public boolean shouldReconnect(Throwable e) {
final Throwable cause = e.getCause();
if (e instanceof IOException && cause instanceof StatusRuntimeException) {
- if (!((StatusRuntimeException) cause).getStatus().isOk()) {
- reconnect = true;
- }
+ return !((StatusRuntimeException) cause).getStatus().isOk();
} else if (e instanceof IllegalArgumentException) {
- if (e.getMessage().contains("null frame before EOS")) {
- reconnect = true;
- }
+ return e.getMessage().contains("null frame before EOS");
}
- LOG.debug("{}->{}: reconnect? {}, e={}, cause={}", clientId, serverId,
reconnect, e, cause);
- return super.handleException(serverId, e, reconnect);
+ return false;
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 177e0f7..05048b7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -159,6 +159,7 @@ public class TestRaftServerWithGrpc extends BaseTest
implements MiniRaftClusterW
// send one more request which should timeout.
final RaftClientRequest requestTimeout = newRaftClientRequest(client,
leader.getId(), seqNum.incrementAndGet());
+ rpc.handleException(leader.getId(), new Exception(), true);
final CompletableFuture<RaftClientReply> f =
rpc.sendRequestAsync(requestTimeout);
testFailureCase("request should timeout", f::get,
ExecutionException.class, TimeoutIOException.class);