This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c7b35c3 RATIS-529. Leader should limit pending requests to avoid OOM.
Contributed by Tsz Wo Nicholas Sze.
c7b35c3 is described below
commit c7b35c3f43fd52bf7669f19fc86c1b05b71bef0e
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri May 10 00:54:22 2019 +0530
RATIS-529. Leader should limit pending requests to avoid OOM. Contributed
by Tsz Wo Nicholas Sze.
---
.../apache/ratis/client/impl/RaftClientImpl.java | 31 +++---
.../ratis/protocol/RaftRetryFailureException.java | 9 +-
.../ResourceUnavailableException.java} | 14 ++-
.../org/apache/ratis/util/ResourceSemaphore.java | 52 +++++++++
.../java/org/apache/ratis/util/SlidingWindow.java | 7 +-
.../java/org/apache/ratis/util/TimeDuration.java | 20 +++-
.../src/test/java/org/apache/ratis/BaseTest.java | 12 ++
.../main/java/org/apache/ratis/grpc/GrpcUtil.java | 7 +-
.../apache/ratis/server/RaftServerConfigKeys.java | 67 +++++++----
.../org/apache/ratis/server/impl/LeaderState.java | 10 +-
.../apache/ratis/server/impl/PendingRequests.java | 92 ++++++++++++---
.../apache/ratis/server/impl/RaftServerImpl.java | 14 ++-
.../apache/ratis/server/impl/WatchRequests.java | 45 +++++---
.../java/org/apache/ratis/MiniRaftCluster.java | 27 ++---
.../apache/ratis/RequestLimitAsyncBaseTest.java | 124 +++++++++++++++++++++
.../java/org/apache/ratis/WatchRequestTests.java | 13 ++-
.../ratis/server/impl/RaftServerTestUtil.java | 3 +
.../statemachine/SimpleStateMachine4Testing.java | 42 ++++++-
.../ratis/grpc/TestRequestLimitAsyncWithGrpc.java | 16 ++-
19 files changed, 487 insertions(+), 118 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index ddcad0e..9caf95e 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -327,18 +327,24 @@ final class RaftClientImpl implements RaftClient {
}).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally));
}
- private RaftClientReply sendRequestWithRetry(
- Supplier<RaftClientRequest> supplier)
- throws InterruptedIOException, StateMachineException,
GroupMismatchException {
- for(int attemptCount = 0;; attemptCount++) {
+ private RaftClientReply sendRequestWithRetry(Supplier<RaftClientRequest>
supplier) throws IOException {
+ for(int attemptCount = 1;; attemptCount++) {
final RaftClientRequest request = supplier.get();
- final RaftClientReply reply = sendRequest(request);
- if (reply != null) {
- return reply;
+ IOException ioe = null;
+ try {
+ final RaftClientReply reply = sendRequest(request);
+ if (reply != null) {
+ return reply;
+ }
+ } catch (GroupMismatchException | StateMachineException e) {
+ throw e;
+ } catch (IOException e) {
+ ioe = e;
}
if (!retryPolicy.shouldRetry(attemptCount, request)) {
- return null;
+ throw (IOException)noMoreRetries(request, attemptCount, retryPolicy,
ioe);
}
+
try {
retryPolicy.getSleepTime(attemptCount, request).sleep();
} catch (InterruptedException e) {
@@ -385,8 +391,7 @@ final class RaftClientImpl implements RaftClient {
if (attemptCount == 1 && throwable != null) {
return throwable;
}
- return new RaftRetryFailureException(
- "Failed " + request + " for " + (attemptCount-1) + " attempts with " +
policy, throwable);
+ return new RaftRetryFailureException(request, attemptCount, policy,
throwable);
}
private void handleAsyncRetryFailure(RaftClientRequest request, int
attemptCount, Throwable throwable) {
@@ -397,16 +402,16 @@ final class RaftClientImpl implements RaftClient {
getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
}
- private RaftClientReply sendRequest(RaftClientRequest request)
- throws StateMachineException, GroupMismatchException {
+ private RaftClientReply sendRequest(RaftClientRequest request) throws
IOException {
LOG.debug("{}: send {}", clientId, request);
- RaftClientReply reply = null;
+ RaftClientReply reply;
try {
reply = clientRpc.sendRequest(request);
} catch (GroupMismatchException gme) {
throw gme;
} catch (IOException ioe) {
handleIOException(request, ioe, null, false);
+ throw ioe;
}
LOG.debug("{}: receive {}", clientId, reply);
reply = handleNotLeaderException(request, reply, false);
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
index 0294bb4..6221b7c 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
@@ -17,11 +17,14 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.retry.RetryPolicy;
+
/**
- * Retry failure as per the retryPolicy defined.
+ * Retry failure as per the {@link RetryPolicy} defined.
*/
public class RaftRetryFailureException extends RaftException {
- public RaftRetryFailureException(String message, Throwable cause) {
- super(message, cause);
+ public RaftRetryFailureException(
+ RaftClientRequest request, int attemptCount, RetryPolicy retryPolicy,
Throwable cause) {
+ super("Failed " + request + " for " + attemptCount + " attempts with " +
retryPolicy, cause);
}
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ResourceUnavailableException.java
similarity index 74%
copy from
ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
copy to
ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ResourceUnavailableException.java
index 0294bb4..92b4abb 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ResourceUnavailableException.java
@@ -15,13 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.protocol;
+package org.apache.ratis.protocol.exceptions;
+
+import org.apache.ratis.protocol.RaftException;
/**
- * Retry failure as per the retryPolicy defined.
+ * A particular resource is unavailable.
*/
-public class RaftRetryFailureException extends RaftException {
- public RaftRetryFailureException(String message, Throwable cause) {
- super(message, cause);
+public class ResourceUnavailableException extends RaftException {
+ public ResourceUnavailableException(String message) {
+ super(message);
}
-}
\ No newline at end of file
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
new file mode 100644
index 0000000..b76fd77
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A {@link Semaphore} with an element limit for tracking resources.
+ *
+ * After {@link #close()}, the resource becomes unavailable, i.e. any acquire
will not succeed.
+ */
+public class ResourceSemaphore extends Semaphore {
+ private final int elementLimit;
+ private final AtomicBoolean isClosed = new AtomicBoolean();
+
+ public ResourceSemaphore(int elementLimit) {
+ super(elementLimit, true);
+ this.elementLimit = elementLimit;
+ }
+
+ /** Close the resource. */
+ public void close() {
+ if (isClosed.compareAndSet(false, true)) {
+ reducePermits(elementLimit);
+ }
+ }
+
+ public boolean isClosed() {
+ return isClosed.get();
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":elementLimit=" + elementLimit +
",closed?" + isClosed;
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 8d5c1b6..8a3237a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -393,11 +393,12 @@ public interface SlidingWindow {
private void processRequestsFromHead(Consumer<REQUEST> processingMethod) {
for(REQUEST r : requests) {
- if (r.getSeqNum() != nextToProcess) {
+ if (r.getSeqNum() > nextToProcess) {
return;
+ } else if (r.getSeqNum() == nextToProcess) {
+ processingMethod.accept(r);
+ nextToProcess++;
}
- processingMethod.accept(r);
- nextToProcess++;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index b5205f3..51dfe7a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.function.LongUnaryOperator;
/**
@@ -203,7 +204,24 @@ public final class TimeDuration implements
Comparable<TimeDuration> {
/** Performs a {@link TimeUnit#sleep(long)} using this {@link TimeDuration}.
*/
public void sleep() throws InterruptedException {
- unit.sleep(duration);
+ sleep(null);
+ }
+
+ public void sleep(Consumer<Object> log) throws InterruptedException {
+ if (log != null) {
+ log.accept(StringUtils.stringSupplierAsObject(() -> "Start sleeping " +
this));
+ }
+ try {
+ unit.sleep(duration);
+ if (log != null) {
+ log.accept(StringUtils.stringSupplierAsObject(() -> "Completed
sleeping " + this));
+ }
+ } catch(InterruptedException ie) {
+ if (log != null) {
+ log.accept(StringUtils.stringSupplierAsObject(() -> "Interrupted
sleeping " + this));
+ }
+ throw ie;
+ }
}
@Override
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 278f9ca..6598ba5 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -37,6 +37,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -46,6 +47,7 @@ public abstract class BaseTest {
public static final TimeDuration HUNDRED_MILLIS = TimeDuration.valueOf(100,
TimeUnit.MILLISECONDS);
public static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1,
TimeUnit.SECONDS);
+ public static final TimeDuration FIVE_SECONDS = TimeDuration.valueOf(5,
TimeUnit.SECONDS);
{
LogUtils.setLogLevel(ConfUtils.LOG, Level.WARN);
@@ -117,6 +119,9 @@ public abstract class BaseTest {
String description, CheckedRunnable<?> testCode,
Class<? extends Throwable> expectedThrowableClass, Logger log,
Class<? extends Throwable>... expectedCauseClasses) {
+ if (log != null) {
+ log.info("run '{}'", description);
+ }
try {
testCode.run();
} catch (Throwable t) {
@@ -139,6 +144,9 @@ public abstract class BaseTest {
String description, Supplier<CompletableFuture<?>> testCode,
Class<? extends Throwable> expectedThrowableClass, Logger log,
Class<? extends Throwable>... expectedCauseClasses) {
+ if (log != null) {
+ log.info("run '{}'", description);
+ }
try {
testCode.get().join();
} catch (Throwable t) {
@@ -155,4 +163,8 @@ public abstract class BaseTest {
Class<? extends Throwable>... expectedCauseClasses) {
return testFailureCaseAsync(description, testCode, expectedThrowableClass,
LOG, expectedCauseClasses);
}
+
+ static <T> T getWithDefaultTimeout(Future<T> future) throws Exception {
+ return future.get(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit());
+ }
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index ce524db..082d0de 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -19,6 +19,7 @@ package org.apache.ratis.grpc;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.ServerNotReadyException;
+import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.thirdparty.io.grpc.Metadata;
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
@@ -76,6 +77,11 @@ public interface GrpcUtil {
}
static IOException tryUnwrapException(StatusRuntimeException se) {
+ final Status status = se.getStatus();
+ if (status != null && status.getCode() == Status.Code.DEADLINE_EXCEEDED) {
+ return new TimeoutIOException(status.getDescription(), se);
+ }
+
final Metadata trailers = se.getTrailers();
if (trailers == null) {
return null;
@@ -90,7 +96,6 @@ public interface GrpcUtil {
}
}
- final Status status = se.getStatus();
if (status != null) {
final String className = trailers.get(EXCEPTION_TYPE_KEY);
if (className != null) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index b4f7faf..e643bf4 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -78,27 +78,52 @@ public interface RaftServerConfigKeys {
}
- String WATCH_TIMEOUT_DENOMINATION_KEY = PREFIX +
".watch.timeout.denomination";
- TimeDuration WATCH_TIMEOUT_DENOMINATION_DEFAULT = TimeDuration.valueOf(1,
TimeUnit.SECONDS);
- static TimeDuration watchTimeoutDenomination(RaftProperties properties) {
- return
getTimeDuration(properties.getTimeDuration(WATCH_TIMEOUT_DENOMINATION_DEFAULT.getUnit()),
- WATCH_TIMEOUT_DENOMINATION_KEY, WATCH_TIMEOUT_DENOMINATION_DEFAULT,
getDefaultLog(), requirePositive());
- }
- static void setWatchTimeoutDenomination(RaftProperties properties,
TimeDuration watchTimeout) {
- setTimeDuration(properties::setTimeDuration,
WATCH_TIMEOUT_DENOMINATION_KEY, watchTimeout);
- }
+ interface Write {
+ String PREFIX = RaftServerConfigKeys.PREFIX + ".write";
- /**
- * Timeout for watch requests.
- */
- String WATCH_TIMEOUT_KEY = PREFIX + ".watch.timeout";
- TimeDuration WATCH_TIMEOUT_DEFAULT = TimeDuration.valueOf(10,
TimeUnit.SECONDS);
- static TimeDuration watchTimeout(RaftProperties properties) {
- return
getTimeDuration(properties.getTimeDuration(WATCH_TIMEOUT_DEFAULT.getUnit()),
- WATCH_TIMEOUT_KEY, WATCH_TIMEOUT_DEFAULT, getDefaultLog(),
requirePositive());
+ String ELEMENT_LIMIT_KEY = PREFIX + ".element-limit";
+ int ELEMENT_LIMIT_DEFAULT = 4096;
+
+ static int elementLimit(RaftProperties properties) {
+ return getInt(properties::getInt, ELEMENT_LIMIT_KEY,
ELEMENT_LIMIT_DEFAULT, getDefaultLog(), requireMin(1));
+ }
+ static void setElementLimit(RaftProperties properties, int limit) {
+ setInt(properties::setInt, ELEMENT_LIMIT_KEY, limit, requireMin(1));
+ }
}
- static void setWatchTimeout(RaftProperties properties, TimeDuration
watchTimeout) {
- setTimeDuration(properties::setTimeDuration, WATCH_TIMEOUT_KEY,
watchTimeout);
+
+ interface Watch {
+ String PREFIX = RaftServerConfigKeys.PREFIX + ".watch";
+
+ String ELEMENT_LIMIT_KEY = PREFIX + ".element-limit";
+ int ELEMENT_LIMIT_DEFAULT = 65536;
+ static int elementLimit(RaftProperties properties) {
+ return getInt(properties::getInt, ELEMENT_LIMIT_KEY,
ELEMENT_LIMIT_DEFAULT, getDefaultLog(), requireMin(1));
+ }
+ static void setElementLimit(RaftProperties properties, int limit) {
+ setInt(properties::setInt, ELEMENT_LIMIT_KEY, limit, requireMin(1));
+ }
+
+ String TIMEOUT_DENOMINATION_KEY = PREFIX + ".timeout.denomination";
+ TimeDuration TIMEOUT_DENOMINATION_DEFAULT = TimeDuration.valueOf(1,
TimeUnit.SECONDS);
+ static TimeDuration timeoutDenomination(RaftProperties properties) {
+ return
getTimeDuration(properties.getTimeDuration(TIMEOUT_DENOMINATION_DEFAULT.getUnit()),
+ TIMEOUT_DENOMINATION_KEY, TIMEOUT_DENOMINATION_DEFAULT,
getDefaultLog(), requirePositive());
+ }
+ static void setTimeoutDenomination(RaftProperties properties, TimeDuration
watchTimeout) {
+ setTimeDuration(properties::setTimeDuration, TIMEOUT_DENOMINATION_KEY,
watchTimeout);
+ }
+
+ /** Timeout for watch requests. */
+ String TIMEOUT_KEY = PREFIX + ".timeout";
+ TimeDuration TIMEOUT_DEFAULT = TimeDuration.valueOf(10, TimeUnit.SECONDS);
+ static TimeDuration timeout(RaftProperties properties) {
+ return
getTimeDuration(properties.getTimeDuration(TIMEOUT_DEFAULT.getUnit()),
+ TIMEOUT_KEY, TIMEOUT_DEFAULT, getDefaultLog(), requirePositive());
+ }
+ static void setTimeout(RaftProperties properties, TimeDuration
watchTimeout) {
+ setTimeDuration(properties::setTimeDuration, TIMEOUT_KEY, watchTimeout);
+ }
}
interface Log {
@@ -119,7 +144,7 @@ public interface RaftServerConfigKeys {
return getInt(properties::getInt, QUEUE_ELEMENT_LIMIT_KEY,
QUEUE_ELEMENT_LIMIT_DEFAULT, getDefaultLog(),
requireMin(1));
}
- static void setElementLimit(RaftProperties properties, int queueSize) {
+ static void setQueueElementLimit(RaftProperties properties, int queueSize)
{
setInt(properties::setInt, QUEUE_ELEMENT_LIMIT_KEY, queueSize,
requireMin(1));
}
@@ -129,7 +154,7 @@ public interface RaftServerConfigKeys {
return getSizeInBytes(properties::getSizeInBytes,
QUEUE_BYTE_LIMIT_KEY, QUEUE_BYTE_LIMIT_DEFAULT, getDefaultLog());
}
- static void setByteLimit(RaftProperties properties, int queueSize) {
+ static void setQueueByteLimit(RaftProperties properties, int queueSize) {
setInt(properties::setInt, QUEUE_BYTE_LIMIT_KEY, queueSize,
requireMin(1));
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 78b56db..844434f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -204,7 +204,7 @@ public class LeaderState {
this.raftLog = state.getLog();
this.currentTerm = state.getCurrentTerm();
processor = new EventProcessor();
- this.pendingRequests = new PendingRequests(server.getId());
+ this.pendingRequests = new PendingRequests(server.getId(), properties);
this.watchRequests = new WatchRequests(server.getId(), properties);
final RaftConfiguration conf = server.getRaftConf();
@@ -294,12 +294,16 @@ public class LeaderState {
return pending;
}
- PendingRequest addPendingRequest(RaftClientRequest request,
TransactionContext entry) {
+ PendingRequests.Permit tryAcquirePendingRequest() {
+ return pendingRequests.tryAcquire();
+ }
+
+ PendingRequest addPendingRequest(PendingRequests.Permit permit,
RaftClientRequest request, TransactionContext entry) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: addPendingRequest at {}, entry=", server.getId(), request,
ServerProtoUtils.toLogEntryString(entry.getLogEntry()));
}
- return pendingRequests.add(request, entry);
+ return pendingRequests.add(permit, request, entry);
}
CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest
request) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 8847a99..46f0c1f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -17,35 +17,79 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
-import org.apache.ratis.protocol.*;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftException;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.ResourceSemaphore;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
class PendingRequests {
public static final Logger LOG =
LoggerFactory.getLogger(PendingRequests.class);
+ static class Permit {}
+
private static class RequestMap {
private final Object name;
private final ConcurrentMap<Long, PendingRequest> map = new
ConcurrentHashMap<>();
- RequestMap(Object name) {
+ /** Permits to put new requests, always synchronized. */
+ private final Map<Permit, Permit> permits = new HashMap<>();
+ /** Track and limit the number of requests. */
+ private final ResourceSemaphore resource;
+
+ RequestMap(Object name, int capacity) {
this.name = name;
+ this.resource = new ResourceSemaphore(capacity);
+ }
+
+ Permit tryAcquire() {
+ final boolean acquired = resource.tryAcquire();
+ LOG.trace("tryAcquire? {}", acquired);
+ if (!acquired) {
+ return null;
+ }
+ return putPermit();
+ }
+
+ private synchronized Permit putPermit() {
+ if (resource.isClosed()) {
+ return null;
+ }
+ final Permit permit = new Permit();
+ permits.put(permit, permit);
+ return permit;
}
- void put(long index, PendingRequest p) {
+ synchronized PendingRequest put(Permit permit, long index, PendingRequest
p) {
LOG.debug("{}: PendingRequests.put {} -> {}", name, index, p);
+ final Permit removed = permits.remove(permit);
+ if (removed == null) {
+ return null;
+ }
+ Preconditions.assertTrue(removed == permit);
final PendingRequest previous = map.put(index, p);
Preconditions.assertTrue(previous == null);
+ return p;
}
PendingRequest get(long index) {
@@ -57,17 +101,32 @@ class PendingRequests {
PendingRequest remove(long index) {
final PendingRequest r = map.remove(index);
LOG.debug("{}: PendingRequests.remove {} returns {}", name, index, r);
+ if (r == null) {
+ return null;
+ }
+ resource.release();
+ LOG.trace("release");
return r;
}
Collection<TransactionContext> setNotLeaderException(NotLeaderException
nle, Collection<CommitInfoProto> commitInfos) {
+ synchronized (this) {
+ resource.close();
+ permits.clear();
+ }
+
LOG.debug("{}: PendingRequests.setNotLeaderException", name);
- try {
- return map.values().stream()
- .map(p -> p.setNotLeaderException(nle, commitInfos))
- .collect(Collectors.toList());
- } finally {
- map.clear();
+ final List<TransactionContext> transactions = new
ArrayList<>(map.size());
+ for(;;) {
+ final Iterator<Long> i = map.keySet().iterator();
+ if (!i.hasNext()) { // the map is empty
+ return transactions;
+ }
+
+ final PendingRequest pending = map.remove(i.next());
+ if (pending != null) {
+ transactions.add(pending.setNotLeaderException(nle, commitInfos));
+ }
}
}
}
@@ -76,19 +135,22 @@ class PendingRequests {
private final String name;
private final RequestMap pendingRequests;
- PendingRequests(RaftPeerId id) {
+ PendingRequests(RaftPeerId id, RaftProperties properties) {
this.name = id + "-" + getClass().getSimpleName();
- this.pendingRequests = new RequestMap(id);
+ this.pendingRequests = new RequestMap(id,
RaftServerConfigKeys.Write.elementLimit(properties));
+ }
+
+ Permit tryAcquire() {
+ return pendingRequests.tryAcquire();
}
- PendingRequest add(RaftClientRequest request, TransactionContext entry) {
+ PendingRequest add(Permit permit, RaftClientRequest request,
TransactionContext entry) {
// externally synced for now
Preconditions.assertTrue(request.is(RaftClientRequestProto.TypeCase.WRITE));
final long index = entry.getLogEntry().getIndex();
LOG.debug("{}: addPendingRequest at index={}, request={}", name, index,
request);
final PendingRequest pending = new PendingRequest(index, request, entry);
- pendingRequests.put(index, pending);
- return pending;
+ return pendingRequests.put(permit, index, pending);
}
PendingRequest addConfRequest(SetConfigurationRequest request) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f58ba9d..f242aac 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
import org.apache.ratis.server.RaftServerRpc;
@@ -485,6 +486,11 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
// append the message to its local log
final LeaderState leaderState = role.getLeaderStateNonNull();
+ final PendingRequests.Permit permit =
leaderState.tryAcquirePendingRequest();
+ if (permit == null) {
+ return JavaUtils.completeExceptionally(new
ResourceUnavailableException(
+ "Failed to acquire a pending write request in " + getId() + " for
" + request));
+ }
try {
state.appendLog(context);
} catch (StateMachineException e) {
@@ -493,14 +499,18 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
RaftClientReply exceptionReply = new RaftClientReply(request, e,
getCommitInfos());
cacheEntry.failWithReply(exceptionReply);
// leader will step down here
- if (isLeader() && leaderState != null) {
+ if (isLeader()) {
leaderState.submitStepDownEvent();
}
return CompletableFuture.completedFuture(exceptionReply);
}
// put the request into the pending queue
- pending = leaderState.addPendingRequest(request, context);
+ pending = leaderState.addPendingRequest(permit, request, context);
+ if (pending == null) {
+ return JavaUtils.completeExceptionally(new
ResourceUnavailableException(
+ "Failed to add a pending write request in " + getId() + " for " +
request));
+ }
leaderState.notifySenders();
}
return pending.getFuture();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index 391b1ea..92197e6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -22,6 +22,7 @@ import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto;
import org.apache.ratis.protocol.NotReplicatedException;
import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
@@ -73,36 +74,47 @@ class WatchRequests {
private final ReplicationLevel replication;
private final SortedMap<PendingWatch, PendingWatch> q = new TreeMap<>(
Comparator.comparingLong(PendingWatch::getIndex).thenComparing(PendingWatch::getCreationTime));
+ private final ResourceSemaphore resource;
private volatile long index; //Invariant: q.isEmpty() or index < any
element q
- WatchQueue(ReplicationLevel replication) {
+ WatchQueue(ReplicationLevel replication, int elementLimit) {
this.replication = replication;
+ this.resource = new ResourceSemaphore(elementLimit);
}
long getIndex() {
return index;
}
- PendingWatch add(RaftClientRequest request) {
+ CompletableFuture<Void> add(RaftClientRequest request) {
final long currentTime = Timestamp.currentTimeNanos();
final long roundUp =
watchTimeoutDenominationNanos.roundUpNanos(currentTime);
final PendingWatch pending = new
PendingWatch(request.getType().getWatch(), Timestamp.valueOf(roundUp));
+ final PendingWatch computed;
synchronized (this) {
- if (pending.getIndex() > getIndex()) { // compare again synchronized
- final PendingWatch previous = q.putIfAbsent(pending, pending);
- if (previous != null) {
- return previous;
- }
- } else {
+ if (pending.getIndex() <= getIndex()) { // compare again synchronized
+ // watch condition already satisfied
return null;
}
+ computed = q.compute(pending, (key, old) -> old != null? old:
resource.tryAcquire()? pending: null);
}
+ if (computed == null) {
+ // failed to acquire
+ return JavaUtils.completeExceptionally(new
ResourceUnavailableException(
+ "Failed to acquire a pending watch request in " + name + " for " +
request));
+ }
+ if (computed != pending) {
+ // already exists in q
+ return computed.getFuture();
+ }
+
+ // newly added to q
final TimeDuration timeout = watchTimeoutNanos.apply(duration ->
duration + roundUp - currentTime);
scheduler.onTimeout(timeout, () -> handleTimeout(request, pending),
LOG, () -> name + ": Failed to timeout " + request);
- return pending;
+ return pending.getFuture();
}
void handleTimeout(RaftClientRequest request, PendingWatch pending) {
@@ -119,6 +131,7 @@ class WatchRequests {
return false;
}
Preconditions.assertTrue(removed == pending);
+ resource.release();
return true;
}
@@ -146,6 +159,7 @@ class WatchRequests {
pending.getFuture().completeExceptionally(e);
}
q.clear();
+ resource.close();
}
}
@@ -159,24 +173,25 @@ class WatchRequests {
WatchRequests(Object name, RaftProperties properties) {
this.name = name + "-" + getClass().getSimpleName();
- final TimeDuration watchTimeout =
RaftServerConfigKeys.watchTimeout(properties);
+ final TimeDuration watchTimeout =
RaftServerConfigKeys.Watch.timeout(properties);
this.watchTimeoutNanos = watchTimeout.to(TimeUnit.NANOSECONDS);
- final TimeDuration watchTimeoutDenomination =
RaftServerConfigKeys.watchTimeoutDenomination(properties);
+ final TimeDuration watchTimeoutDenomination =
RaftServerConfigKeys.Watch.timeoutDenomination(properties);
this.watchTimeoutDenominationNanos =
watchTimeoutDenomination.to(TimeUnit.NANOSECONDS);
Preconditions.assertTrue(watchTimeoutNanos.getDuration() %
watchTimeoutDenominationNanos.getDuration() == 0L,
() -> "watchTimeout (=" + watchTimeout + ") is not a multiple of
watchTimeoutDenomination (="
+ watchTimeoutDenomination + ").");
- Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new
WatchQueue(r)));
+ final int elementLimit =
RaftServerConfigKeys.Watch.elementLimit(properties);
+ Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new
WatchQueue(r, elementLimit)));
}
CompletableFuture<Void> add(RaftClientRequest request) {
final WatchRequestTypeProto watch = request.getType().getWatch();
final WatchQueue queue = queues.get(watch.getReplication());
if (watch.getIndex() > queue.getIndex()) { // compare without
synchronization
- final PendingWatch pending = queue.add(request);
- if (pending != null) {
- return pending.getFuture();
+ final CompletableFuture<Void> future = queue.add(request);
+ if (future != null) {
+ return future;
}
}
return CompletableFuture.completedFuture(null);
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 52481d6..de7dc0e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -99,6 +99,12 @@ public abstract class MiniRaftCluster implements Closeable {
return properties.get();
}
+ default RaftProperties setStateMachine(Class<? extends StateMachine>
stateMachineClass) {
+ final RaftProperties p = getProperties();
+ p.setClass(STATEMACHINE_CLASS_KEY, stateMachineClass,
StateMachine.class);
+ return p;
+ }
+
default CLUSTER newCluster(int numPeers) {
return getFactory().newCluster(numPeers, getProperties());
}
@@ -638,35 +644,20 @@ public abstract class MiniRaftCluster implements
Closeable {
return createClient(null, g);
}
- public RaftClient createClientWithLeader() {
- return createClient(getLeader().getId(), group);
- }
-
- public RaftClient createClientWithFollower() {
- return createClient(getFollowers().get(0).getId(), group);
- }
-
public RaftClient createClient(RaftPeerId leaderId) {
return createClient(leaderId, group);
}
public RaftClient createClient(RaftPeerId leaderId, RetryPolicy retryPolicy)
{
- return createClient(leaderId, group, null, retryPolicy);
+ return createClient(leaderId, group, retryPolicy);
}
public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) {
- return createClient(leaderId, group, null, getDefaultRetryPolicy());
- }
-
- public RaftClient createClient(RaftPeerId leaderId, RaftGroup group,
- ClientId clientId) {
- return createClient(leaderId, group, clientId, getDefaultRetryPolicy());
+ return createClient(leaderId, group, getDefaultRetryPolicy());
}
- public RaftClient createClient(RaftPeerId leaderId, RaftGroup group,
- ClientId clientId, RetryPolicy retryPolicy) {
+ public RaftClient createClient(RaftPeerId leaderId, RaftGroup group,
RetryPolicy retryPolicy) {
RaftClient.Builder builder = RaftClient.newBuilder()
- .setClientId(clientId)
.setRaftGroup(group)
.setLeaderId(leaderId)
.setProperties(properties)
diff --git
a/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
new file mode 100644
index 0000000..bb1ccfb
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.util.LogUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public abstract class RequestLimitAsyncBaseTest<CLUSTER extends
MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+ static {
+ LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+ RaftServerTestUtil.setPendingRequestsLogLevel(Level.DEBUG);
+ }
+
+ private final int writeElementLimit = 5;
+ private final int watchElementLimit = 2;
+ {
+ final RaftProperties p = setStateMachine(SimpleStateMachine4Testing.class);
+ RaftServerConfigKeys.Write.setElementLimit(p, writeElementLimit);
+ RaftServerConfigKeys.Watch.setElementLimit(p, watchElementLimit);
+
+ RaftServerConfigKeys.Rpc.setRequestTimeout(p, FIVE_SECONDS);
+ RaftClientConfigKeys.Rpc.setRequestTimeout(p, FIVE_SECONDS);
+ }
+
+ @Test
+ public void testWriteElementLimit() throws Exception {
+ runWithSameCluster(1, this::runTestWriteElementLimit);
+ }
+
+ void runTestWriteElementLimit(CLUSTER cluster) throws Exception {
+ final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+
+ try (RaftClient c1 = cluster.createClient(leader.getId())) {
+ { // send first message to make sure the cluster is working
+ final SimpleMessage message = new SimpleMessage("first");
+ final CompletableFuture<RaftClientReply> future =
c1.sendAsync(message);
+ final RaftClientReply reply = getWithDefaultTimeout(future);
+ Assert.assertTrue(reply.isSuccess());
+ }
+
+ // collecting futures returned from StateMachine.applyTransaction
+ final BlockingQueue<Runnable> toBeCompleted =
SimpleStateMachine4Testing.get(leader).collecting().enable(
+ SimpleStateMachine4Testing.Collecting.Type.APPLY_TRANSACTION);
+
+ // send write requests up to the limit
+ final List<CompletableFuture<RaftClientReply>> writeFutures = new
ArrayList<>();
+ for (int i = 0; i < writeElementLimit; i++) {
+ final SimpleMessage message = new SimpleMessage("m" + i);
+ writeFutures.add(c1.sendAsync(message));
+ }
+
+ // send watch requests up to the limit
+ final long watchBase = 1000; //watch a large index so that it won't
complete
+ for (int i = 0; i < watchElementLimit; i++) {
+ c1.sendWatchAsync(watchBase + i, ReplicationLevel.ALL);
+ }
+
+ // sleep to make sure that all the request were sent
+ HUNDRED_MILLIS.sleep();
+
+ try(RaftClient c2 = cluster.createClient(leader.getId(),
RetryPolicies.noRetry())) {
+ // more write requests should get ResourceUnavailableException
+ final SimpleMessage message = new SimpleMessage("err");
+ testFailureCase("send should fail", () -> c2.send(message),
+ ResourceUnavailableException.class);
+ testFailureCase("sendAsync should fail", () ->
c2.sendAsync(message).get(),
+ ExecutionException.class, ResourceUnavailableException.class);
+
+ // more watch requests should get ResourceUnavailableException
+ final long watchIndex = watchBase + watchElementLimit;
+ testFailureCase("sendWatch should fail", () ->
c2.sendWatch(watchIndex, ReplicationLevel.ALL),
+ ResourceUnavailableException.class);
+ testFailureCase("sendWatchAsync should fail", () ->
c2.sendWatchAsync(watchIndex, ReplicationLevel.ALL).get(),
+ ExecutionException.class, ResourceUnavailableException.class);
+ }
+
+ // complete futures from applyTransaction
+ toBeCompleted.forEach(Runnable::run);
+ // check replies
+ for(CompletableFuture<RaftClientReply> f : writeFutures) {
+ final RaftClientReply reply = getWithDefaultTimeout(f);
+ Assert.assertTrue(reply.isSuccess());
+ }
+ }
+ }
+}
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 63aad09..86b6e4c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -331,14 +331,14 @@ public abstract class WatchRequestTests<CLUSTER extends
MiniRaftCluster>
@Test
public void testWatchRequestTimeout() throws Exception {
final RaftProperties p = getProperties();
- RaftServerConfigKeys.setWatchTimeout(p, TimeDuration.valueOf(500,
TimeUnit.MILLISECONDS));
- RaftServerConfigKeys.setWatchTimeoutDenomination(p,
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS));
+ RaftServerConfigKeys.Watch.setTimeout(p, TimeDuration.valueOf(500,
TimeUnit.MILLISECONDS));
+ RaftServerConfigKeys.Watch.setTimeoutDenomination(p,
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS));
try {
runWithNewCluster(NUM_SERVERS,
cluster -> runTest(WatchRequestTests::runTestWatchRequestTimeout,
cluster, LOG));
} finally {
- RaftServerConfigKeys.setWatchTimeout(p,
RaftServerConfigKeys.WATCH_TIMEOUT_DEFAULT);
- RaftServerConfigKeys.setWatchTimeoutDenomination(p,
RaftServerConfigKeys.WATCH_TIMEOUT_DENOMINATION_DEFAULT);
+ RaftServerConfigKeys.Watch.setTimeout(p,
RaftServerConfigKeys.Watch.TIMEOUT_DEFAULT);
+ RaftServerConfigKeys.Watch.setTimeoutDenomination(p,
RaftServerConfigKeys.Watch.TIMEOUT_DENOMINATION_DEFAULT);
}
}
@@ -347,8 +347,9 @@ public abstract class WatchRequestTests<CLUSTER extends
MiniRaftCluster>
final MiniRaftCluster cluster = p.cluster;
final int numMessages = p.numMessages;
- final TimeDuration watchTimeout =
RaftServerConfigKeys.watchTimeout(cluster.getProperties());
- final TimeDuration watchTimeoutDenomination =
RaftServerConfigKeys.watchTimeoutDenomination(cluster.getProperties());
+ final RaftProperties properties = cluster.getProperties();
+ final TimeDuration watchTimeout =
RaftServerConfigKeys.Watch.timeout(properties);
+ final TimeDuration watchTimeoutDenomination =
RaftServerConfigKeys.Watch.timeoutDenomination(properties);
// blockStartTransaction of the leader so that no transaction can be
committed MAJORITY
final RaftServerImpl leader = cluster.getLeader();
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 4c3f06d..2488540 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -42,6 +42,9 @@ public class RaftServerTestUtil {
public static void setWatchRequestsLogLevel(Level level) {
LogUtils.setLogLevel(WatchRequests.LOG, level);
}
+ public static void setPendingRequestsLogLevel(Level level) {
+ LogUtils.setLogLevel(PendingRequests.LOG, level);
+ }
public static void waitAndCheckNewConf(MiniRaftCluster cluster,
RaftPeer[] peers, int numOfRemovedPeers, Collection<RaftPeerId>
deadPeers)
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 6a8c532..ac3813f 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -55,8 +55,10 @@ import java.util.EnumMap;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
@@ -90,6 +92,36 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
private volatile boolean running = true;
+ public static class Collecting {
+ public enum Type {
+ APPLY_TRANSACTION
+ }
+
+ private final EnumMap<Type, BlockingQueue<Runnable>> map = new
EnumMap<>(Type.class);
+
+ BlockingQueue<Runnable> get(Type type) {
+ return map.get(type);
+ }
+
+ public BlockingQueue<Runnable> enable(Type type) {
+ final BlockingQueue<Runnable> q = new LinkedBlockingQueue<>();
+ final BlockingQueue<Runnable> previous = map.put(type, q);
+ Preconditions.assertNull(previous, "previous");
+ return q;
+ }
+
+ <T> CompletableFuture<T> collect(Type type, T value) {
+ final BlockingQueue<Runnable> q = get(type);
+ if (q == null) {
+ return CompletableFuture.completedFuture(value);
+ }
+
+ final CompletableFuture<T> future = new CompletableFuture<>();
+ final boolean offered = q.offer(() -> future.complete(value));
+ Preconditions.assertTrue(offered);
+ return future;
+ }
+ }
static class Blocking {
enum Type {
@@ -122,6 +154,7 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
}
private final Blocking blocking = new Blocking();
+ private final Collecting collecting = new Collecting();
private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX;
private volatile RoleInfoProto slownessInfo = null;
private volatile RoleInfoProto leaderElectionTimeoutInfo = null;
@@ -141,6 +174,10 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
});
}
+ public Collecting collecting() {
+ return collecting;
+ }
+
public RoleInfoProto getSlownessInfo() {
return slownessInfo;
}
@@ -193,8 +230,9 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
put(entry);
updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
- return CompletableFuture.completedFuture(
- new SimpleMessage(entry.getIndex() + " OK"));
+
+ final SimpleMessage m = new SimpleMessage(entry.getIndex() + " OK");
+ return collecting.collect(Collecting.Type.APPLY_TRANSACTION, m);
}
@Override
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRequestLimitAsyncWithGrpc.java
similarity index 75%
copy from
ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
copy to
ratis-test/src/test/java/org/apache/ratis/grpc/TestRequestLimitAsyncWithGrpc.java
index 0294bb4..19d71fe 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRequestLimitAsyncWithGrpc.java
@@ -15,13 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.protocol;
+package org.apache.ratis.grpc;
-/**
- * Retry failure as per the retryPolicy defined.
- */
-public class RaftRetryFailureException extends RaftException {
- public RaftRetryFailureException(String message, Throwable cause) {
- super(message, cause);
- }
-}
\ No newline at end of file
+import org.apache.ratis.RequestLimitAsyncBaseTest;
+
+public class TestRequestLimitAsyncWithGrpc
+ extends RequestLimitAsyncBaseTest<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
+}