This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 4b613c1 RATIS-568. Server sliding window should fail out of order
requests on close. Contributed by Tsz Wo Nicholas Sze.
4b613c1 is described below
commit 4b613c1e2e91e91de7a84dc378a2e67da7fb092b
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed May 29 10:17:54 2019 +0530
RATIS-568. Server sliding window should fail out of order requests on
close. Contributed by Tsz Wo Nicholas Sze.
---
.../apache/ratis/client/impl/RaftClientImpl.java | 14 ++--
.../apache/ratis/client/impl/UnorderedAsync.java | 2 +-
.../org/apache/ratis/protocol/RaftClientReply.java | 6 ++
.../java/org/apache/ratis/util/SlidingWindow.java | 79 ++++++++++++++--------
.../grpc/client/GrpcClientProtocolService.java | 38 +++++++----
5 files changed, 88 insertions(+), 51 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9caf95e..024f6c7 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -206,7 +206,7 @@ final class RaftClientImpl implements RaftClient {
slidingWindowEntry -> newRaftClientRequest(server, callId, message,
type, slidingWindowEntry));
return getSlidingWindow(server).submitNewRequest(constructor,
this::sendRequestWithRetryAsync
).getReplyFuture(
- ).thenApply(reply -> handleStateMachineException(reply,
CompletionException::new)
+ ).thenApply(reply -> handleRaftException(reply, CompletionException::new)
).whenComplete((r, e) -> asyncRequestSemaphore.release());
}
@@ -415,16 +415,16 @@ final class RaftClientImpl implements RaftClient {
}
LOG.debug("{}: receive {}", clientId, reply);
reply = handleNotLeaderException(request, reply, false);
- reply = handleStateMachineException(reply, Function.identity());
+ reply = handleRaftException(reply, Function.identity());
return reply;
}
- static <E extends Throwable> RaftClientReply handleStateMachineException(
- RaftClientReply reply, Function<StateMachineException, E> converter)
throws E {
+ static <E extends Throwable> RaftClientReply handleRaftException(
+ RaftClientReply reply, Function<RaftException, E> converter) throws E {
if (reply != null) {
- final StateMachineException sme = reply.getStateMachineException();
- if (sme != null) {
- throw converter.apply(sme);
+ final RaftException e = reply.getException();
+ if (e != null) {
+ throw converter.apply(e);
}
}
return reply;
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index d5ceeaf..7637e8d 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -57,7 +57,7 @@ public interface UnorderedAsync {
() -> client.newRaftClientRequest(null, callId, null, type, null));
sendRequestWithRetry(pending, client);
return pending.getReplyFuture()
- .thenApply(reply -> RaftClientImpl.handleStateMachineException(reply,
CompletionException::new));
+ .thenApply(reply -> RaftClientImpl.handleRaftException(reply,
CompletionException::new));
}
static void sendRequestWithRetry(PendingClientRequest pending,
RaftClientImpl client) {
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 9539806..f06a0e5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -67,6 +67,7 @@ public class RaftClientReply extends RaftClientMessage {
Preconditions.assertTrue(!success,
() -> "Inconsistent parameters: success && exception != null: " +
this);
Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
+ AlreadyClosedException.class,
NotLeaderException.class, NotReplicatedException.class,
StateMachineException.class),
() -> "Unexpected exception class: " + this);
}
@@ -130,6 +131,11 @@ public class RaftClientReply extends RaftClientMessage {
return message;
}
+ /** If this reply has {@link AlreadyClosedException}, return it; otherwise
return null. */
+ public AlreadyClosedException getAlreadyClosedException() {
+ return JavaUtils.cast(exception, AlreadyClosedException.class);
+ }
+
/** If this reply has {@link NotLeaderException}, return it; otherwise
return null. */
public NotLeaderException getNotLeaderException() {
return JavaUtils.cast(exception, NotLeaderException.class);
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 8a3237a..b572832 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -37,18 +37,22 @@ import java.util.function.LongFunction;
public interface SlidingWindow {
Logger LOG = LoggerFactory.getLogger(SlidingWindow.class);
+ static String getName(Class<?> clazz, Object name) {
+ return SlidingWindow.class.getSimpleName() + "$" + clazz.getSimpleName()
+ ":" + name;
+ }
+
interface Request<REPLY> {
long getSeqNum();
void setReply(REPLY reply);
boolean hasReply();
+
+ void fail(Throwable e);
}
interface ClientSideRequest<REPLY> extends Request<REPLY> {
void setFirstRequest();
-
- void fail(Throwable e);
}
interface ServerSideRequest<REPLY> extends Request<REPLY> {
@@ -65,7 +69,7 @@ public interface SlidingWindow {
RequestMap(Object name) {
this.name = name;
if (LOG_REPEATEDLY && LOG.isDebugEnabled()) {
- JavaUtils.runRepeatedly(() -> log(), 5, 10, TimeUnit.SECONDS);
+ JavaUtils.runRepeatedly(this::log, 5, 10, TimeUnit.SECONDS);
}
}
@@ -73,10 +77,14 @@ public interface SlidingWindow {
return name;
}
- boolean isEmpty() {
+ synchronized boolean isEmpty() {
return requests.isEmpty();
}
+ private synchronized REQUEST get(long seqNum) {
+ return requests.get(seqNum);
+ }
+
/**
* If the request with the given seqNum is non-replied, return it.
* Otherwise, return null.
@@ -86,7 +94,7 @@ public interface SlidingWindow {
* (2) it does not has reply.
*/
REQUEST getNonRepliedRequest(long seqNum, String op) {
- final REQUEST request = requests.get(seqNum);
+ final REQUEST request = get(seqNum);
if (request == null) {
LOG.debug("{}: {}, seq={} not found in {}", getName(), op, seqNum,
this);
return null;
@@ -98,11 +106,11 @@ public interface SlidingWindow {
return request;
}
- long firstSeqNum() {
+ synchronized long firstSeqNum() {
return requests.firstKey();
}
- long lastSeqNum() {
+ synchronized long lastSeqNum() {
return requests.lastKey();
}
@@ -112,7 +120,7 @@ public interface SlidingWindow {
return requests.values().iterator();
}
- void putNewRequest(REQUEST request) {
+ synchronized void putNewRequest(REQUEST request) {
final long seqNum = request.getSeqNum();
CollectionUtils.putNew(seqNum, request, requests, () -> getName() +
":requests");
}
@@ -123,8 +131,8 @@ public interface SlidingWindow {
*
* @return true iff this method does set the reply for the request.
*/
- boolean setReply(long seqNum, REPLY reply, String op) {
- final REQUEST request = getNonRepliedRequest(seqNum, op);
+ boolean setReply(long seqNum, REPLY reply) {
+ final REQUEST request = getNonRepliedRequest(seqNum, "setReply");
if (request == null) {
LOG.debug("{}: DUPLICATED reply {} for seq={} in {}", getName(),
reply, seqNum, this);
return false;
@@ -135,6 +143,24 @@ public interface SlidingWindow {
return true;
}
+ synchronized void endOfRequests(long nextToProcess, REQUEST end,
Consumer<REQUEST> replyMethod) {
+ final REQUEST nextToProcessRequest = requests.get(nextToProcess);
+ Preconditions.assertNull(nextToProcessRequest,
+ () -> "nextToProcessRequest = " + nextToProcessRequest + " != null,
nextToProcess = " + nextToProcess);
+
+ final SortedMap<Long, REQUEST> tail = requests.tailMap(nextToProcess);
+ for (REQUEST r : tail.values()) {
+ final AlreadyClosedException e = new AlreadyClosedException(
+ getName() + " is closing: seq = " + r.getSeqNum() + " >
nextToProcess = " + nextToProcess
+ + " will NEVER be processed; request = " + r);
+ r.fail(e);
+ replyMethod.accept(r);
+ }
+ tail.clear();
+
+ putNewRequest(end);
+ }
+
synchronized void clear() {
LOG.debug("close {}", this);
requests.clear();
@@ -148,7 +174,7 @@ public interface SlidingWindow {
}
@Override
- public String toString() {
+ public synchronized String toString() {
return getName() + ": requests" + asString(requests);
}
@@ -184,7 +210,7 @@ public interface SlidingWindow {
private Throwable exception;
public Client(Object name) {
- this.requests = new RequestMap<REQUEST, REPLY>(name) {
+ this.requests = new RequestMap<REQUEST, REPLY>(getName(getClass(),
name)) {
@Override
synchronized void log() {
LOG.debug(toString());
@@ -284,7 +310,7 @@ public interface SlidingWindow {
*/
public synchronized void receiveReply(
long seqNum, REPLY reply, Consumer<REQUEST> sendMethod) {
- if (!requests.setReply(seqNum, reply, "receiveReply")) {
+ if (!requests.setReply(seqNum, reply)) {
return; // request already replied
}
if (seqNum == firstSeqNum) {
@@ -346,8 +372,7 @@ public interface SlidingWindow {
}
private void alreadyClosed(REQUEST request, Throwable e) {
- request.fail(new
AlreadyClosedException(SlidingWindow.class.getSimpleName() + "$" +
getClass().getSimpleName()
- + " " + requests.getName() + " is closed.", e));
+ request.fail(new AlreadyClosedException(requests.getName() + " is
closed.", e));
}
}
@@ -368,7 +393,7 @@ public interface SlidingWindow {
private long nextToProcess = -1;
public Server(Object name, REQUEST end) {
- this.requests = new RequestMap<>(name);
+ this.requests = new RequestMap<>(getName(getClass(), name));
this.end = end;
Preconditions.assertTrue(end.getSeqNum() == Long.MAX_VALUE);
}
@@ -404,20 +429,16 @@ public interface SlidingWindow {
/**
* Receives a reply for the given seqNum (may out-of-order) from the
processor.
- * It may trigger sending replies to client or processing more requests.
+ * It may trigger sending replies to client.
*/
- public synchronized void receiveReply(
- long seqNum, REPLY reply, Consumer<REQUEST> replyMethod,
Consumer<REQUEST> processingMethod) {
- if (!requests.setReply(seqNum, reply, "receiveReply")) {
+ public synchronized void receiveReply(long seqNum, REPLY reply,
Consumer<REQUEST> replyMethod) {
+ if (!requests.setReply(seqNum, reply)) {
return; // request already replied
}
sendRepliesFromHead(replyMethod);
- processRequestsFromHead(processingMethod);
}
- private void sendRepliesFromHead(
- Consumer<REQUEST> replyMethod
- ) {
+ private void sendRepliesFromHead(Consumer<REQUEST> replyMethod) {
for(final Iterator<REQUEST> i = requests.iterator(); i.hasNext();
i.remove()) {
final REQUEST r = i.next();
if (!r.hasReply()) {
@@ -434,14 +455,14 @@ public interface SlidingWindow {
* Signal the end of requests.
* @return true if no more outstanding requests.
*/
- public synchronized boolean endOfRequests() {
+ public synchronized boolean endOfRequests(Consumer<REQUEST> replyMethod) {
if (requests.isEmpty()) {
return true;
- } else {
- LOG.debug("{}: put end-of-request in {}", requests.getName(), this);
- requests.putNewRequest(end);
- return false;
}
+
+ LOG.debug("{}: put end-of-request in {}", requests.getName(), this);
+ requests.endOfRequests(nextToProcess, end, replyMethod);
+ return false;
}
@Override
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index b0a7578..9c2fc0c 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -40,6 +40,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -48,24 +49,31 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
private static class PendingOrderedRequest implements
SlidingWindow.ServerSideRequest<RaftClientReply> {
private final RaftClientRequest request;
- private volatile RaftClientReply reply;
+ private final AtomicReference<RaftClientReply> reply = new
AtomicReference<>();
PendingOrderedRequest(RaftClientRequest request) {
this.request = request;
}
@Override
+ public void fail(Throwable t) {
+ Preconditions.assertTrue(t instanceof RaftException, () -> "Requires
RaftException but " + t);
+ setReply(new RaftClientReply(request, (RaftException) t, null));
+ }
+
+ @Override
public boolean hasReply() {
- return reply != null || this == COMPLETED;
+ return getReply() != null || this == COMPLETED;
}
@Override
- public void setReply(RaftClientReply reply) {
- this.reply = reply;
+ public void setReply(RaftClientReply r) {
+ final boolean set = reply.compareAndSet(null, r);
+ Preconditions.assertTrue(set, () -> "Reply is already set: request=" +
request + ", reply=" + reply);
}
RaftClientReply getReply() {
- return reply;
+ return reply.get();
}
RaftClientRequest getRequest() {
@@ -105,7 +113,7 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
// Okay if an existing object is removed by another mean during the
iteration since it must be already closed.
// Also okay if a new object is added during the iteration since this
method closes only the existing objects.
for(OrderedRequestStreamObserver so : map.values()) {
- so.close();
+ so.close(true);
}
}
}
@@ -304,7 +312,7 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
void processClientRequest(PendingOrderedRequest pending) {
final long seq = pending.getSeqNum();
processClientRequest(pending.getRequest(),
- reply -> slidingWindow.receiveReply(seq, reply, this::sendReply,
this::processClientRequest));
+ reply -> slidingWindow.receiveReply(seq, reply, this::sendReply));
}
@Override
@@ -315,7 +323,7 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
private void sendReply(PendingOrderedRequest ready) {
Preconditions.assertTrue(ready.hasReply());
if (ready == COMPLETED) {
- close();
+ close(true);
} else {
LOG.debug("{}: sendReply seq={}, {}", getName(), ready.getSeqNum(),
ready.getReply());
responseNext(ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
@@ -326,20 +334,22 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
public void onError(Throwable t) {
// for now we just log a msg
GrpcUtil.warn(LOG, () -> getName() + ": onError", t);
- slidingWindow.close();
+ close(false);
}
@Override
public void onCompleted() {
- if (slidingWindow.endOfRequests()) {
- close();
+ if (slidingWindow.endOfRequests(this::sendReply)) {
+ close(true);
}
}
- private void close() {
+ private void close(boolean complete) {
if (setClose()) {
LOG.debug("{}: close", getName());
- responseCompleted();
+ if (complete) {
+ responseCompleted();
+ }
slidingWindow.close();
orderedStreamObservers.removeExisting(this);
}
@@ -348,7 +358,7 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
@Override
boolean responseError(Throwable t, Supplier<String> message) {
if (super.responseError(t, message)) {
- slidingWindow.close();
+ close(false);
return true;
}
return false;