This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c7b35c3  RATIS-529. Leader should limit pending requests to avoid OOM. 
Contributed by Tsz Wo Nicholas Sze.
c7b35c3 is described below

commit c7b35c3f43fd52bf7669f19fc86c1b05b71bef0e
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri May 10 00:54:22 2019 +0530

    RATIS-529. Leader should limit pending requests to avoid OOM. Contributed 
by Tsz Wo Nicholas Sze.
---
 .../apache/ratis/client/impl/RaftClientImpl.java   |  31 +++---
 .../ratis/protocol/RaftRetryFailureException.java  |   9 +-
 .../ResourceUnavailableException.java}             |  14 ++-
 .../org/apache/ratis/util/ResourceSemaphore.java   |  52 +++++++++
 .../java/org/apache/ratis/util/SlidingWindow.java  |   7 +-
 .../java/org/apache/ratis/util/TimeDuration.java   |  20 +++-
 .../src/test/java/org/apache/ratis/BaseTest.java   |  12 ++
 .../main/java/org/apache/ratis/grpc/GrpcUtil.java  |   7 +-
 .../apache/ratis/server/RaftServerConfigKeys.java  |  67 +++++++----
 .../org/apache/ratis/server/impl/LeaderState.java  |  10 +-
 .../apache/ratis/server/impl/PendingRequests.java  |  92 ++++++++++++---
 .../apache/ratis/server/impl/RaftServerImpl.java   |  14 ++-
 .../apache/ratis/server/impl/WatchRequests.java    |  45 +++++---
 .../java/org/apache/ratis/MiniRaftCluster.java     |  27 ++---
 .../apache/ratis/RequestLimitAsyncBaseTest.java    | 124 +++++++++++++++++++++
 .../java/org/apache/ratis/WatchRequestTests.java   |  13 ++-
 .../ratis/server/impl/RaftServerTestUtil.java      |   3 +
 .../statemachine/SimpleStateMachine4Testing.java   |  42 ++++++-
 .../ratis/grpc/TestRequestLimitAsyncWithGrpc.java  |  16 ++-
 19 files changed, 487 insertions(+), 118 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index ddcad0e..9caf95e 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -327,18 +327,24 @@ final class RaftClientImpl implements RaftClient {
     
}).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally));
   }
 
-  private RaftClientReply sendRequestWithRetry(
-      Supplier<RaftClientRequest> supplier)
-      throws InterruptedIOException, StateMachineException, 
GroupMismatchException {
-    for(int attemptCount = 0;; attemptCount++) {
+  private RaftClientReply sendRequestWithRetry(Supplier<RaftClientRequest> 
supplier) throws IOException {
+    for(int attemptCount = 1;; attemptCount++) {
       final RaftClientRequest request = supplier.get();
-      final RaftClientReply reply = sendRequest(request);
-      if (reply != null) {
-        return reply;
+      IOException ioe = null;
+      try {
+        final RaftClientReply reply = sendRequest(request);
+        if (reply != null) {
+          return reply;
+        }
+      } catch (GroupMismatchException | StateMachineException e) {
+        throw e;
+      } catch (IOException e) {
+        ioe = e;
       }
       if (!retryPolicy.shouldRetry(attemptCount, request)) {
-        return null;
+        throw (IOException)noMoreRetries(request, attemptCount, retryPolicy, 
ioe);
       }
+
       try {
         retryPolicy.getSleepTime(attemptCount, request).sleep();
       } catch (InterruptedException e) {
@@ -385,8 +391,7 @@ final class RaftClientImpl implements RaftClient {
     if (attemptCount == 1 && throwable != null) {
       return throwable;
     }
-    return new RaftRetryFailureException(
-        "Failed " + request + " for " + (attemptCount-1) + " attempts with " + 
policy, throwable);
+    return new RaftRetryFailureException(request, attemptCount, policy, 
throwable);
   }
 
   private void handleAsyncRetryFailure(RaftClientRequest request, int 
attemptCount, Throwable throwable) {
@@ -397,16 +402,16 @@ final class RaftClientImpl implements RaftClient {
     
getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
   }
 
-  private RaftClientReply sendRequest(RaftClientRequest request)
-      throws StateMachineException, GroupMismatchException {
+  private RaftClientReply sendRequest(RaftClientRequest request) throws 
IOException {
     LOG.debug("{}: send {}", clientId, request);
-    RaftClientReply reply = null;
+    RaftClientReply reply;
     try {
       reply = clientRpc.sendRequest(request);
     } catch (GroupMismatchException gme) {
       throw gme;
     } catch (IOException ioe) {
       handleIOException(request, ioe, null, false);
+      throw ioe;
     }
     LOG.debug("{}: receive {}", clientId, reply);
     reply = handleNotLeaderException(request, reply, false);
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
index 0294bb4..6221b7c 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
@@ -17,11 +17,14 @@
  */
 package org.apache.ratis.protocol;
 
+import org.apache.ratis.retry.RetryPolicy;
+
 /**
- * Retry failure as per the retryPolicy defined.
+ * Retry failure as per the {@link RetryPolicy} defined.
  */
 public class RaftRetryFailureException extends RaftException {
-  public RaftRetryFailureException(String message, Throwable cause) {
-    super(message, cause);
+  public RaftRetryFailureException(
+      RaftClientRequest request, int attemptCount, RetryPolicy retryPolicy, 
Throwable cause) {
+    super("Failed " + request + " for " + attemptCount + " attempts with " + 
retryPolicy, cause);
   }
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ResourceUnavailableException.java
similarity index 74%
copy from 
ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
copy to 
ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ResourceUnavailableException.java
index 0294bb4..92b4abb 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ResourceUnavailableException.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.protocol;
+package org.apache.ratis.protocol.exceptions;
+
+import org.apache.ratis.protocol.RaftException;
 
 /**
- * Retry failure as per the retryPolicy defined.
+ * A particular resource is unavailable.
  */
-public class RaftRetryFailureException extends RaftException {
-  public RaftRetryFailureException(String message, Throwable cause) {
-    super(message, cause);
+public class ResourceUnavailableException extends RaftException {
+  public ResourceUnavailableException(String message) {
+    super(message);
   }
-}
\ No newline at end of file
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
new file mode 100644
index 0000000..b76fd77
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A {@link Semaphore} with an element limit for tracking resources.
+ *
+ * After {@link #close()}, the resource becomes unavailable, i.e. any acquire 
will not succeed.
+ */
+public class ResourceSemaphore extends Semaphore {
+  private final int elementLimit;
+  private final AtomicBoolean isClosed = new AtomicBoolean();
+
+  public ResourceSemaphore(int elementLimit) {
+    super(elementLimit, true);
+    this.elementLimit = elementLimit;
+  }
+
+  /** Close the resource. */
+  public void close() {
+    if (isClosed.compareAndSet(false, true)) {
+      reducePermits(elementLimit);
+    }
+  }
+
+  public boolean isClosed() {
+    return isClosed.get();
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + ":elementLimit=" + elementLimit + 
",closed?" + isClosed;
+  }
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java 
b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 8d5c1b6..8a3237a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -393,11 +393,12 @@ public interface SlidingWindow {
 
     private void processRequestsFromHead(Consumer<REQUEST> processingMethod) {
       for(REQUEST r : requests) {
-        if (r.getSeqNum() != nextToProcess) {
+        if (r.getSeqNum() > nextToProcess) {
           return;
+        } else if (r.getSeqNum() == nextToProcess) {
+          processingMethod.accept(r);
+          nextToProcess++;
         }
-        processingMethod.accept(r);
-        nextToProcess++;
       }
     }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java 
b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index b5205f3..51dfe7a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.function.LongUnaryOperator;
 
 /**
@@ -203,7 +204,24 @@ public final class TimeDuration implements 
Comparable<TimeDuration> {
 
   /** Performs a {@link TimeUnit#sleep(long)} using this {@link TimeDuration}. 
*/
   public void sleep() throws InterruptedException {
-    unit.sleep(duration);
+    sleep(null);
+  }
+
+  public void sleep(Consumer<Object> log) throws InterruptedException {
+    if (log != null) {
+      log.accept(StringUtils.stringSupplierAsObject(() -> "Start sleeping " + 
this));
+    }
+    try {
+      unit.sleep(duration);
+      if (log != null) {
+        log.accept(StringUtils.stringSupplierAsObject(() -> "Completed 
sleeping " + this));
+      }
+    } catch(InterruptedException ie) {
+      if (log != null) {
+        log.accept(StringUtils.stringSupplierAsObject(() -> "Interrupted 
sleeping " + this));
+      }
+      throw ie;
+    }
   }
 
   @Override
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java 
b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 278f9ca..6598ba5 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -37,6 +37,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -46,6 +47,7 @@ public abstract class BaseTest {
 
   public static final TimeDuration HUNDRED_MILLIS = TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS);
   public static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, 
TimeUnit.SECONDS);
+  public static final TimeDuration FIVE_SECONDS = TimeDuration.valueOf(5, 
TimeUnit.SECONDS);
 
   {
     LogUtils.setLogLevel(ConfUtils.LOG, Level.WARN);
@@ -117,6 +119,9 @@ public abstract class BaseTest {
       String description, CheckedRunnable<?> testCode,
       Class<? extends Throwable> expectedThrowableClass, Logger log,
       Class<? extends Throwable>... expectedCauseClasses) {
+    if (log != null) {
+      log.info("run '{}'", description);
+    }
     try {
       testCode.run();
     } catch (Throwable t) {
@@ -139,6 +144,9 @@ public abstract class BaseTest {
       String description, Supplier<CompletableFuture<?>> testCode,
       Class<? extends Throwable> expectedThrowableClass, Logger log,
       Class<? extends Throwable>... expectedCauseClasses) {
+    if (log != null) {
+      log.info("run '{}'", description);
+    }
     try {
       testCode.get().join();
     } catch (Throwable t) {
@@ -155,4 +163,8 @@ public abstract class BaseTest {
       Class<? extends Throwable>... expectedCauseClasses) {
     return testFailureCaseAsync(description, testCode, expectedThrowableClass, 
LOG, expectedCauseClasses);
   }
+
+  static <T> T getWithDefaultTimeout(Future<T> future) throws Exception {
+    return future.get(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit());
+  }
 }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index ce524db..082d0de 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -19,6 +19,7 @@ package org.apache.ratis.grpc;
 
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.ServerNotReadyException;
+import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.thirdparty.io.grpc.Metadata;
 import org.apache.ratis.thirdparty.io.grpc.Status;
 import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
@@ -76,6 +77,11 @@ public interface GrpcUtil {
   }
 
   static IOException tryUnwrapException(StatusRuntimeException se) {
+    final Status status = se.getStatus();
+    if (status != null && status.getCode() == Status.Code.DEADLINE_EXCEEDED) {
+      return new TimeoutIOException(status.getDescription(), se);
+    }
+
     final Metadata trailers = se.getTrailers();
     if (trailers == null) {
       return null;
@@ -90,7 +96,6 @@ public interface GrpcUtil {
       }
     }
 
-    final Status status = se.getStatus();
     if (status != null) {
       final String className = trailers.get(EXCEPTION_TYPE_KEY);
       if (className != null) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index b4f7faf..e643bf4 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -78,27 +78,52 @@ public interface RaftServerConfigKeys {
 
   }
 
-  String WATCH_TIMEOUT_DENOMINATION_KEY = PREFIX + 
".watch.timeout.denomination";
-  TimeDuration WATCH_TIMEOUT_DENOMINATION_DEFAULT = TimeDuration.valueOf(1, 
TimeUnit.SECONDS);
-  static TimeDuration watchTimeoutDenomination(RaftProperties properties) {
-    return 
getTimeDuration(properties.getTimeDuration(WATCH_TIMEOUT_DENOMINATION_DEFAULT.getUnit()),
-        WATCH_TIMEOUT_DENOMINATION_KEY, WATCH_TIMEOUT_DENOMINATION_DEFAULT, 
getDefaultLog(), requirePositive());
-  }
-  static void setWatchTimeoutDenomination(RaftProperties properties, 
TimeDuration watchTimeout) {
-    setTimeDuration(properties::setTimeDuration, 
WATCH_TIMEOUT_DENOMINATION_KEY, watchTimeout);
-  }
+  interface Write {
+    String PREFIX = RaftServerConfigKeys.PREFIX + ".write";
 
-  /**
-   * Timeout for watch requests.
-   */
-  String WATCH_TIMEOUT_KEY = PREFIX + ".watch.timeout";
-  TimeDuration WATCH_TIMEOUT_DEFAULT = TimeDuration.valueOf(10, 
TimeUnit.SECONDS);
-  static TimeDuration watchTimeout(RaftProperties properties) {
-    return 
getTimeDuration(properties.getTimeDuration(WATCH_TIMEOUT_DEFAULT.getUnit()),
-        WATCH_TIMEOUT_KEY, WATCH_TIMEOUT_DEFAULT, getDefaultLog(), 
requirePositive());
+    String ELEMENT_LIMIT_KEY = PREFIX + ".element-limit";
+    int ELEMENT_LIMIT_DEFAULT = 4096;
+
+    static int elementLimit(RaftProperties properties) {
+      return getInt(properties::getInt, ELEMENT_LIMIT_KEY, 
ELEMENT_LIMIT_DEFAULT, getDefaultLog(), requireMin(1));
+    }
+    static void setElementLimit(RaftProperties properties, int limit) {
+      setInt(properties::setInt, ELEMENT_LIMIT_KEY, limit, requireMin(1));
+    }
   }
-  static void setWatchTimeout(RaftProperties properties, TimeDuration 
watchTimeout) {
-    setTimeDuration(properties::setTimeDuration, WATCH_TIMEOUT_KEY, 
watchTimeout);
+
+  interface Watch {
+    String PREFIX = RaftServerConfigKeys.PREFIX + ".watch";
+
+    String ELEMENT_LIMIT_KEY = PREFIX + ".element-limit";
+    int ELEMENT_LIMIT_DEFAULT = 65536;
+    static int elementLimit(RaftProperties properties) {
+      return getInt(properties::getInt, ELEMENT_LIMIT_KEY, 
ELEMENT_LIMIT_DEFAULT, getDefaultLog(), requireMin(1));
+    }
+    static void setElementLimit(RaftProperties properties, int limit) {
+      setInt(properties::setInt, ELEMENT_LIMIT_KEY, limit, requireMin(1));
+    }
+
+    String TIMEOUT_DENOMINATION_KEY = PREFIX + ".timeout.denomination";
+    TimeDuration TIMEOUT_DENOMINATION_DEFAULT = TimeDuration.valueOf(1, 
TimeUnit.SECONDS);
+    static TimeDuration timeoutDenomination(RaftProperties properties) {
+      return 
getTimeDuration(properties.getTimeDuration(TIMEOUT_DENOMINATION_DEFAULT.getUnit()),
+          TIMEOUT_DENOMINATION_KEY, TIMEOUT_DENOMINATION_DEFAULT, 
getDefaultLog(), requirePositive());
+    }
+    static void setTimeoutDenomination(RaftProperties properties, TimeDuration 
watchTimeout) {
+      setTimeDuration(properties::setTimeDuration, TIMEOUT_DENOMINATION_KEY, 
watchTimeout);
+    }
+
+    /** Timeout for watch requests. */
+    String TIMEOUT_KEY = PREFIX + ".timeout";
+    TimeDuration TIMEOUT_DEFAULT = TimeDuration.valueOf(10, TimeUnit.SECONDS);
+    static TimeDuration timeout(RaftProperties properties) {
+      return 
getTimeDuration(properties.getTimeDuration(TIMEOUT_DEFAULT.getUnit()),
+          TIMEOUT_KEY, TIMEOUT_DEFAULT, getDefaultLog(), requirePositive());
+    }
+    static void setTimeout(RaftProperties properties, TimeDuration 
watchTimeout) {
+      setTimeDuration(properties::setTimeDuration, TIMEOUT_KEY, watchTimeout);
+    }
   }
 
   interface Log {
@@ -119,7 +144,7 @@ public interface RaftServerConfigKeys {
       return getInt(properties::getInt, QUEUE_ELEMENT_LIMIT_KEY, 
QUEUE_ELEMENT_LIMIT_DEFAULT, getDefaultLog(),
           requireMin(1));
     }
-    static void setElementLimit(RaftProperties properties, int queueSize) {
+    static void setQueueElementLimit(RaftProperties properties, int queueSize) 
{
       setInt(properties::setInt, QUEUE_ELEMENT_LIMIT_KEY, queueSize, 
requireMin(1));
     }
 
@@ -129,7 +154,7 @@ public interface RaftServerConfigKeys {
       return getSizeInBytes(properties::getSizeInBytes,
           QUEUE_BYTE_LIMIT_KEY, QUEUE_BYTE_LIMIT_DEFAULT, getDefaultLog());
     }
-    static void setByteLimit(RaftProperties properties, int queueSize) {
+    static void setQueueByteLimit(RaftProperties properties, int queueSize) {
       setInt(properties::setInt, QUEUE_BYTE_LIMIT_KEY, queueSize, 
requireMin(1));
     }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 78b56db..844434f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -204,7 +204,7 @@ public class LeaderState {
     this.raftLog = state.getLog();
     this.currentTerm = state.getCurrentTerm();
     processor = new EventProcessor();
-    this.pendingRequests = new PendingRequests(server.getId());
+    this.pendingRequests = new PendingRequests(server.getId(), properties);
     this.watchRequests = new WatchRequests(server.getId(), properties);
 
     final RaftConfiguration conf = server.getRaftConf();
@@ -294,12 +294,16 @@ public class LeaderState {
     return pending;
   }
 
-  PendingRequest addPendingRequest(RaftClientRequest request, 
TransactionContext entry) {
+  PendingRequests.Permit tryAcquirePendingRequest() {
+    return pendingRequests.tryAcquire();
+  }
+
+  PendingRequest addPendingRequest(PendingRequests.Permit permit, 
RaftClientRequest request, TransactionContext entry) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("{}: addPendingRequest at {}, entry=", server.getId(), request,
           ServerProtoUtils.toLogEntryString(entry.getLogEntry()));
     }
-    return pendingRequests.add(request, entry);
+    return pendingRequests.add(permit, request, entry);
   }
 
   CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest 
request) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 8847a99..46f0c1f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -17,35 +17,79 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
-import org.apache.ratis.protocol.*;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftException;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.ResourceSemaphore;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 class PendingRequests {
   public static final Logger LOG = 
LoggerFactory.getLogger(PendingRequests.class);
 
+  static class Permit {}
+
   private static class RequestMap {
     private final Object name;
     private final ConcurrentMap<Long, PendingRequest> map = new 
ConcurrentHashMap<>();
 
-    RequestMap(Object name) {
+    /** Permits to put new requests, always synchronized. */
+    private final Map<Permit, Permit> permits = new HashMap<>();
+    /** Track and limit the number of requests. */
+    private final ResourceSemaphore resource;
+
+    RequestMap(Object name, int capacity) {
       this.name = name;
+      this.resource = new ResourceSemaphore(capacity);
+    }
+
+    Permit tryAcquire() {
+      final boolean acquired = resource.tryAcquire();
+      LOG.trace("tryAcquire? {}", acquired);
+      if (!acquired) {
+        return null;
+      }
+      return putPermit();
+    }
+
+    private synchronized Permit putPermit() {
+      if (resource.isClosed()) {
+        return null;
+      }
+      final Permit permit = new Permit();
+      permits.put(permit, permit);
+      return permit;
     }
 
-    void put(long index, PendingRequest p) {
+    synchronized PendingRequest put(Permit permit, long index, PendingRequest 
p) {
       LOG.debug("{}: PendingRequests.put {} -> {}", name, index, p);
+      final Permit removed = permits.remove(permit);
+      if (removed == null) {
+        return null;
+      }
+      Preconditions.assertTrue(removed == permit);
       final PendingRequest previous = map.put(index, p);
       Preconditions.assertTrue(previous == null);
+      return p;
     }
 
     PendingRequest get(long index) {
@@ -57,17 +101,32 @@ class PendingRequests {
     PendingRequest remove(long index) {
       final PendingRequest r = map.remove(index);
       LOG.debug("{}: PendingRequests.remove {} returns {}", name, index, r);
+      if (r == null) {
+        return null;
+      }
+      resource.release();
+      LOG.trace("release");
       return r;
     }
 
     Collection<TransactionContext> setNotLeaderException(NotLeaderException 
nle, Collection<CommitInfoProto> commitInfos) {
+      synchronized (this) {
+        resource.close();
+        permits.clear();
+      }
+
       LOG.debug("{}: PendingRequests.setNotLeaderException", name);
-      try {
-        return map.values().stream()
-            .map(p -> p.setNotLeaderException(nle, commitInfos))
-            .collect(Collectors.toList());
-      } finally {
-        map.clear();
+      final List<TransactionContext> transactions = new 
ArrayList<>(map.size());
+      for(;;) {
+        final Iterator<Long> i = map.keySet().iterator();
+        if (!i.hasNext()) { // the map is empty
+          return transactions;
+        }
+
+        final PendingRequest pending = map.remove(i.next());
+        if (pending != null) {
+          transactions.add(pending.setNotLeaderException(nle, commitInfos));
+        }
       }
     }
   }
@@ -76,19 +135,22 @@ class PendingRequests {
   private final String name;
   private final RequestMap pendingRequests;
 
-  PendingRequests(RaftPeerId id) {
+  PendingRequests(RaftPeerId id, RaftProperties properties) {
     this.name = id + "-" + getClass().getSimpleName();
-    this.pendingRequests = new RequestMap(id);
+    this.pendingRequests = new RequestMap(id, 
RaftServerConfigKeys.Write.elementLimit(properties));
+  }
+
+  Permit tryAcquire() {
+    return pendingRequests.tryAcquire();
   }
 
-  PendingRequest add(RaftClientRequest request, TransactionContext entry) {
+  PendingRequest add(Permit permit, RaftClientRequest request, 
TransactionContext entry) {
     // externally synced for now
     
Preconditions.assertTrue(request.is(RaftClientRequestProto.TypeCase.WRITE));
     final long index = entry.getLogEntry().getIndex();
     LOG.debug("{}: addPendingRequest at index={}, request={}", name, index, 
request);
     final PendingRequest pending = new PendingRequest(index, request, entry);
-    pendingRequests.put(index, pending);
-    return pending;
+    return pendingRequests.put(permit, index, pending);
   }
 
   PendingRequest addConfRequest(SetConfigurationRequest request) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f58ba9d..f242aac 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.impl;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerMXBean;
 import org.apache.ratis.server.RaftServerRpc;
@@ -485,6 +486,11 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
       // append the message to its local log
       final LeaderState leaderState = role.getLeaderStateNonNull();
+      final PendingRequests.Permit permit = 
leaderState.tryAcquirePendingRequest();
+      if (permit == null) {
+        return JavaUtils.completeExceptionally(new 
ResourceUnavailableException(
+            "Failed to acquire a pending write request in " + getId() + " for 
" + request));
+      }
       try {
         state.appendLog(context);
       } catch (StateMachineException e) {
@@ -493,14 +499,18 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
         RaftClientReply exceptionReply = new RaftClientReply(request, e, 
getCommitInfos());
         cacheEntry.failWithReply(exceptionReply);
         // leader will step down here
-        if (isLeader() && leaderState != null) {
+        if (isLeader()) {
           leaderState.submitStepDownEvent();
         }
         return CompletableFuture.completedFuture(exceptionReply);
       }
 
       // put the request into the pending queue
-      pending = leaderState.addPendingRequest(request, context);
+      pending = leaderState.addPendingRequest(permit, request, context);
+      if (pending == null) {
+        return JavaUtils.completeExceptionally(new 
ResourceUnavailableException(
+            "Failed to add a pending write request in " + getId() + " for " + 
request));
+      }
       leaderState.notifySenders();
     }
     return pending.getFuture();
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index 391b1ea..92197e6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -22,6 +22,7 @@ import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto;
 import org.apache.ratis.protocol.NotReplicatedException;
 import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.util.*;
 import org.slf4j.Logger;
@@ -73,36 +74,47 @@ class WatchRequests {
     private final ReplicationLevel replication;
     private final SortedMap<PendingWatch, PendingWatch> q = new TreeMap<>(
         
Comparator.comparingLong(PendingWatch::getIndex).thenComparing(PendingWatch::getCreationTime));
+    private final ResourceSemaphore resource;
     private volatile long index; //Invariant: q.isEmpty() or index < any 
element q
 
-    WatchQueue(ReplicationLevel replication) {
+    WatchQueue(ReplicationLevel replication, int elementLimit) {
       this.replication = replication;
+      this.resource = new ResourceSemaphore(elementLimit);
     }
 
     long getIndex() {
       return index;
     }
 
-    PendingWatch add(RaftClientRequest request) {
+    CompletableFuture<Void> add(RaftClientRequest request) {
       final long currentTime = Timestamp.currentTimeNanos();
       final long roundUp = 
watchTimeoutDenominationNanos.roundUpNanos(currentTime);
       final PendingWatch pending = new 
PendingWatch(request.getType().getWatch(), Timestamp.valueOf(roundUp));
 
+      final PendingWatch computed;
       synchronized (this) {
-        if (pending.getIndex() > getIndex()) { // compare again synchronized
-          final PendingWatch previous = q.putIfAbsent(pending, pending);
-          if (previous != null) {
-            return previous;
-          }
-        } else {
+        if (pending.getIndex() <= getIndex()) { // compare again synchronized
+          // watch condition already satisfied
           return null;
         }
+        computed = q.compute(pending, (key, old) -> old != null? old: 
resource.tryAcquire()? pending: null);
       }
 
+      if (computed == null) {
+        // failed to acquire
+        return JavaUtils.completeExceptionally(new 
ResourceUnavailableException(
+            "Failed to acquire a pending watch request in " + name + " for " + 
request));
+      }
+      if (computed != pending) {
+        // already exists in q
+        return computed.getFuture();
+      }
+
+      // newly added to q
       final TimeDuration timeout = watchTimeoutNanos.apply(duration -> 
duration + roundUp - currentTime);
       scheduler.onTimeout(timeout, () -> handleTimeout(request, pending),
           LOG, () -> name + ": Failed to timeout " + request);
-      return pending;
+      return pending.getFuture();
     }
 
     void handleTimeout(RaftClientRequest request, PendingWatch pending) {
@@ -119,6 +131,7 @@ class WatchRequests {
         return false;
       }
       Preconditions.assertTrue(removed == pending);
+      resource.release();
       return true;
     }
 
@@ -146,6 +159,7 @@ class WatchRequests {
         pending.getFuture().completeExceptionally(e);
       }
       q.clear();
+      resource.close();
     }
   }
 
@@ -159,24 +173,25 @@ class WatchRequests {
   WatchRequests(Object name, RaftProperties properties) {
     this.name = name + "-" + getClass().getSimpleName();
 
-    final TimeDuration watchTimeout = 
RaftServerConfigKeys.watchTimeout(properties);
+    final TimeDuration watchTimeout = 
RaftServerConfigKeys.Watch.timeout(properties);
     this.watchTimeoutNanos = watchTimeout.to(TimeUnit.NANOSECONDS);
-    final TimeDuration watchTimeoutDenomination = 
RaftServerConfigKeys.watchTimeoutDenomination(properties);
+    final TimeDuration watchTimeoutDenomination = 
RaftServerConfigKeys.Watch.timeoutDenomination(properties);
     this.watchTimeoutDenominationNanos = 
watchTimeoutDenomination.to(TimeUnit.NANOSECONDS);
     Preconditions.assertTrue(watchTimeoutNanos.getDuration() % 
watchTimeoutDenominationNanos.getDuration() == 0L,
         () -> "watchTimeout (=" + watchTimeout + ") is not a multiple of 
watchTimeoutDenomination (="
             + watchTimeoutDenomination + ").");
 
-    Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new 
WatchQueue(r)));
+    final int elementLimit = 
RaftServerConfigKeys.Watch.elementLimit(properties);
+    Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new 
WatchQueue(r, elementLimit)));
   }
 
   CompletableFuture<Void> add(RaftClientRequest request) {
     final WatchRequestTypeProto watch = request.getType().getWatch();
     final WatchQueue queue = queues.get(watch.getReplication());
     if (watch.getIndex() > queue.getIndex()) { // compare without 
synchronization
-      final PendingWatch pending = queue.add(request);
-      if (pending != null) {
-        return pending.getFuture();
+      final CompletableFuture<Void> future = queue.add(request);
+      if (future != null) {
+        return future;
       }
     }
     return CompletableFuture.completedFuture(null);
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 52481d6..de7dc0e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -99,6 +99,12 @@ public abstract class MiniRaftCluster implements Closeable {
         return properties.get();
       }
 
+      default RaftProperties setStateMachine(Class<? extends StateMachine> 
stateMachineClass) {
+        final RaftProperties p = getProperties();
+        p.setClass(STATEMACHINE_CLASS_KEY, stateMachineClass, 
StateMachine.class);
+        return p;
+      }
+
       default CLUSTER newCluster(int numPeers) {
         return getFactory().newCluster(numPeers, getProperties());
       }
@@ -638,35 +644,20 @@ public abstract class MiniRaftCluster implements 
Closeable {
     return createClient(null, g);
   }
 
-  public RaftClient createClientWithLeader() {
-    return createClient(getLeader().getId(), group);
-  }
-
-  public RaftClient createClientWithFollower() {
-    return createClient(getFollowers().get(0).getId(), group);
-  }
-
   public RaftClient createClient(RaftPeerId leaderId) {
     return createClient(leaderId, group);
   }
 
   public RaftClient createClient(RaftPeerId leaderId, RetryPolicy retryPolicy) 
{
-    return createClient(leaderId, group, null, retryPolicy);
+    return createClient(leaderId, group, retryPolicy);
   }
 
   public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) {
-    return createClient(leaderId, group, null, getDefaultRetryPolicy());
-  }
-
-  public RaftClient createClient(RaftPeerId leaderId, RaftGroup group,
-      ClientId clientId) {
-    return createClient(leaderId, group, clientId, getDefaultRetryPolicy());
+    return createClient(leaderId, group, getDefaultRetryPolicy());
   }
 
-  public RaftClient createClient(RaftPeerId leaderId, RaftGroup group,
-      ClientId clientId, RetryPolicy retryPolicy) {
+  public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, 
RetryPolicy retryPolicy) {
     RaftClient.Builder builder = RaftClient.newBuilder()
-        .setClientId(clientId)
         .setRaftGroup(group)
         .setLeaderId(leaderId)
         .setProperties(properties)
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java 
b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
new file mode 100644
index 0000000..bb1ccfb
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.util.LogUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public abstract class RequestLimitAsyncBaseTest<CLUSTER extends 
MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  static {
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+    RaftServerTestUtil.setPendingRequestsLogLevel(Level.DEBUG);
+  }
+
+  private final int writeElementLimit = 5;
+  private final int watchElementLimit = 2;
+  {
+    final RaftProperties p = setStateMachine(SimpleStateMachine4Testing.class);
+    RaftServerConfigKeys.Write.setElementLimit(p, writeElementLimit);
+    RaftServerConfigKeys.Watch.setElementLimit(p, watchElementLimit);
+
+    RaftServerConfigKeys.Rpc.setRequestTimeout(p, FIVE_SECONDS);
+    RaftClientConfigKeys.Rpc.setRequestTimeout(p, FIVE_SECONDS);
+  }
+
+  @Test
+  public void testWriteElementLimit() throws Exception {
+    runWithSameCluster(1, this::runTestWriteElementLimit);
+  }
+
+  void runTestWriteElementLimit(CLUSTER cluster) throws Exception {
+    final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+
+    try (RaftClient c1 = cluster.createClient(leader.getId())) {
+      { // send first message to make sure the cluster is working
+        final SimpleMessage message = new SimpleMessage("first");
+        final CompletableFuture<RaftClientReply> future = 
c1.sendAsync(message);
+        final RaftClientReply reply = getWithDefaultTimeout(future);
+        Assert.assertTrue(reply.isSuccess());
+      }
+
+      // collecting futures returned from StateMachine.applyTransaction
+      final BlockingQueue<Runnable> toBeCompleted = 
SimpleStateMachine4Testing.get(leader).collecting().enable(
+          SimpleStateMachine4Testing.Collecting.Type.APPLY_TRANSACTION);
+
+      // send write requests up to the limit
+      final List<CompletableFuture<RaftClientReply>> writeFutures = new 
ArrayList<>();
+      for (int i = 0; i < writeElementLimit; i++) {
+        final SimpleMessage message = new SimpleMessage("m" + i);
+        writeFutures.add(c1.sendAsync(message));
+      }
+
+      // send watch requests up to the limit
+      final long watchBase = 1000; //watch a large index so that it won't 
complete
+      for (int i = 0; i < watchElementLimit; i++) {
+        c1.sendWatchAsync(watchBase + i, ReplicationLevel.ALL);
+      }
+
+      // sleep to make sure that all the request were sent
+      HUNDRED_MILLIS.sleep();
+
+      try(RaftClient c2 = cluster.createClient(leader.getId(), 
RetryPolicies.noRetry())) {
+        // more write requests should get ResourceUnavailableException
+        final SimpleMessage message = new SimpleMessage("err");
+        testFailureCase("send should fail", () -> c2.send(message),
+            ResourceUnavailableException.class);
+        testFailureCase("sendAsync should fail", () -> 
c2.sendAsync(message).get(),
+            ExecutionException.class, ResourceUnavailableException.class);
+
+        // more watch requests should get ResourceUnavailableException
+        final long watchIndex = watchBase + watchElementLimit;
+        testFailureCase("sendWatch should fail", () -> 
c2.sendWatch(watchIndex, ReplicationLevel.ALL),
+            ResourceUnavailableException.class);
+        testFailureCase("sendWatchAsync should fail", () -> 
c2.sendWatchAsync(watchIndex, ReplicationLevel.ALL).get(),
+            ExecutionException.class, ResourceUnavailableException.class);
+      }
+
+      // complete futures from applyTransaction
+      toBeCompleted.forEach(Runnable::run);
+      // check replies
+      for(CompletableFuture<RaftClientReply> f : writeFutures) {
+        final RaftClientReply reply = getWithDefaultTimeout(f);
+        Assert.assertTrue(reply.isSuccess());
+      }
+    }
+  }
+}
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 63aad09..86b6e4c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -331,14 +331,14 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
   @Test
   public void testWatchRequestTimeout() throws Exception {
     final RaftProperties p = getProperties();
-    RaftServerConfigKeys.setWatchTimeout(p, TimeDuration.valueOf(500, 
TimeUnit.MILLISECONDS));
-    RaftServerConfigKeys.setWatchTimeoutDenomination(p, 
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS));
+    RaftServerConfigKeys.Watch.setTimeout(p, TimeDuration.valueOf(500, 
TimeUnit.MILLISECONDS));
+    RaftServerConfigKeys.Watch.setTimeoutDenomination(p, 
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS));
     try {
       runWithNewCluster(NUM_SERVERS,
           cluster -> runTest(WatchRequestTests::runTestWatchRequestTimeout, 
cluster, LOG));
     } finally {
-      RaftServerConfigKeys.setWatchTimeout(p, 
RaftServerConfigKeys.WATCH_TIMEOUT_DEFAULT);
-      RaftServerConfigKeys.setWatchTimeoutDenomination(p, 
RaftServerConfigKeys.WATCH_TIMEOUT_DENOMINATION_DEFAULT);
+      RaftServerConfigKeys.Watch.setTimeout(p, 
RaftServerConfigKeys.Watch.TIMEOUT_DEFAULT);
+      RaftServerConfigKeys.Watch.setTimeoutDenomination(p, 
RaftServerConfigKeys.Watch.TIMEOUT_DENOMINATION_DEFAULT);
     }
   }
 
@@ -347,8 +347,9 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     final MiniRaftCluster cluster = p.cluster;
     final int numMessages = p.numMessages;
 
-    final TimeDuration watchTimeout = 
RaftServerConfigKeys.watchTimeout(cluster.getProperties());
-    final TimeDuration watchTimeoutDenomination = 
RaftServerConfigKeys.watchTimeoutDenomination(cluster.getProperties());
+    final RaftProperties properties = cluster.getProperties();
+    final TimeDuration watchTimeout = 
RaftServerConfigKeys.Watch.timeout(properties);
+    final TimeDuration watchTimeoutDenomination = 
RaftServerConfigKeys.Watch.timeoutDenomination(properties);
 
     // blockStartTransaction of the leader so that no transaction can be 
committed MAJORITY
     final RaftServerImpl leader = cluster.getLeader();
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 4c3f06d..2488540 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -42,6 +42,9 @@ public class RaftServerTestUtil {
   public static void setWatchRequestsLogLevel(Level level) {
     LogUtils.setLogLevel(WatchRequests.LOG, level);
   }
+  public static void setPendingRequestsLogLevel(Level level) {
+    LogUtils.setLogLevel(PendingRequests.LOG, level);
+  }
 
   public static void waitAndCheckNewConf(MiniRaftCluster cluster,
       RaftPeer[] peers, int numOfRemovedPeers, Collection<RaftPeerId> 
deadPeers)
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 6a8c532..ac3813f 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -55,8 +55,10 @@ import java.util.EnumMap;
 import java.util.Objects;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -90,6 +92,36 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
 
   private volatile boolean running = true;
 
+  public static class Collecting {
+    public enum Type {
+      APPLY_TRANSACTION
+    }
+
+    private final EnumMap<Type, BlockingQueue<Runnable>> map = new 
EnumMap<>(Type.class);
+
+    BlockingQueue<Runnable> get(Type type) {
+      return map.get(type);
+    }
+
+    public BlockingQueue<Runnable> enable(Type type) {
+      final BlockingQueue<Runnable> q = new LinkedBlockingQueue<>();
+      final BlockingQueue<Runnable> previous = map.put(type, q);
+      Preconditions.assertNull(previous, "previous");
+      return q;
+    }
+
+    <T> CompletableFuture<T> collect(Type type, T value) {
+      final BlockingQueue<Runnable> q = get(type);
+      if (q == null) {
+        return CompletableFuture.completedFuture(value);
+      }
+
+      final CompletableFuture<T> future = new CompletableFuture<>();
+      final boolean offered = q.offer(() -> future.complete(value));
+      Preconditions.assertTrue(offered);
+      return future;
+    }
+  }
 
   static class Blocking {
     enum Type {
@@ -122,6 +154,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   }
 
   private final Blocking blocking = new Blocking();
+  private final Collecting collecting = new Collecting();
   private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX;
   private volatile RoleInfoProto slownessInfo = null;
   private volatile RoleInfoProto leaderElectionTimeoutInfo = null;
@@ -141,6 +174,10 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     });
   }
 
+  public Collecting collecting() {
+    return collecting;
+  }
+
   public RoleInfoProto getSlownessInfo() {
     return slownessInfo;
   }
@@ -193,8 +230,9 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
     put(entry);
     updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
-    return CompletableFuture.completedFuture(
-        new SimpleMessage(entry.getIndex() + " OK"));
+
+    final SimpleMessage m = new SimpleMessage(entry.getIndex() + " OK");
+    return collecting.collect(Collecting.Type.APPLY_TRANSACTION, m);
   }
 
   @Override
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRequestLimitAsyncWithGrpc.java
similarity index 75%
copy from 
ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
copy to 
ratis-test/src/test/java/org/apache/ratis/grpc/TestRequestLimitAsyncWithGrpc.java
index 0294bb4..19d71fe 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRequestLimitAsyncWithGrpc.java
@@ -15,13 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.protocol;
+package org.apache.ratis.grpc;
 
-/**
- * Retry failure as per the retryPolicy defined.
- */
-public class RaftRetryFailureException extends RaftException {
-  public RaftRetryFailureException(String message, Throwable cause) {
-    super(message, cause);
-  }
-}
\ No newline at end of file
+import org.apache.ratis.RequestLimitAsyncBaseTest;
+
+public class TestRequestLimitAsyncWithGrpc
+    extends RequestLimitAsyncBaseTest<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
+}

Reply via email to