This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new f9f6354  HBASE-22322 Use special pause for CallQueueTooBigException
f9f6354 is described below

commit f9f63543933616ca887fa2dc954dfd7e649d0461
Author: Duo Zhang <[email protected]>
AuthorDate: Mon Apr 29 10:20:33 2019 +0800

    HBASE-22322 Use special pause for CallQueueTooBigException
---
 .../hadoop/hbase/client/AsyncAdminBuilder.java     |  22 ++-
 .../hadoop/hbase/client/AsyncAdminBuilderBase.java |   9 +
 .../client/AsyncAdminRequestRetryingCaller.java    |   8 +-
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  |  24 ++-
 .../hadoop/hbase/client/AsyncClientScanner.java    |  29 +--
 .../hbase/client/AsyncConnectionConfiguration.java |  23 ++-
 .../AsyncMasterRequestRpcRetryingCaller.java       |   8 +-
 .../hbase/client/AsyncRpcRetryingCaller.java       |  13 +-
 .../client/AsyncRpcRetryingCallerFactory.java      |  50 ++++-
 .../AsyncScanSingleRegionRpcRetryingCaller.java    |  12 +-
 .../AsyncServerRequestRpcRetryingCaller.java       |   8 +-
 .../AsyncSingleRequestRpcRetryingCaller.java       |   8 +-
 .../hadoop/hbase/client/AsyncTableBuilder.java     |  13 ++
 .../hadoop/hbase/client/AsyncTableBuilderBase.java |   9 +
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    |  36 ++--
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  26 ++-
 .../TestAsyncClientPauseForCallQueueTooBig.java    | 204 +++++++++++++++++++++
 17 files changed, 422 insertions(+), 80 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java
index 6a8db9e..49bc350 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java
@@ -36,16 +36,12 @@ public interface AsyncAdminBuilder {
    * Set timeout for a whole admin operation. Operation timeout and max 
attempt times(or max retry
    * times) are both limitations for retrying, we will stop retrying when we 
reach any of the
    * limitations.
-   * @param timeout
-   * @param unit
    * @return this for invocation chaining
    */
   AsyncAdminBuilder setOperationTimeout(long timeout, TimeUnit unit);
 
   /**
    * Set timeout for each rpc request.
-   * @param timeout
-   * @param unit
    * @return this for invocation chaining
    */
   AsyncAdminBuilder setRpcTimeout(long timeout, TimeUnit unit);
@@ -53,17 +49,27 @@ public interface AsyncAdminBuilder {
   /**
    * Set the base pause time for retrying. We use an exponential policy to 
generate sleep time when
    * retrying.
-   * @param timeout
-   * @param unit
    * @return this for invocation chaining
+   * @see #setRetryPauseForCQTBE(long, TimeUnit)
    */
   AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit);
 
   /**
+   * Set the base pause time for retrying when we hit {@code 
CallQueueTooBigException}. We use an
+   * exponential policy to generate sleep time when retrying.
+   * <p/>
+   * This value should be greater than the normal pause value which could be 
set with the above
+   * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code 
CallQueueTooBigException}
+   * means the server is overloaded. We just use the normal pause value for
+   * {@code CallQueueTooBigException} if here you specify a smaller value.
+   * @see #setRetryPause(long, TimeUnit)
+   */
+  AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit);
+
+  /**
    * Set the max retry times for an admin operation. Usually it is the max 
attempt times minus 1.
    * Operation timeout and max attempt times(or max retry times) are both 
limitations for retrying,
    * we will stop retrying when we reach any of the limitations.
-   * @param maxRetries
    * @return this for invocation chaining
    */
   default AsyncAdminBuilder setMaxRetries(int maxRetries) {
@@ -74,14 +80,12 @@ public interface AsyncAdminBuilder {
    * Set the max attempt times for an admin operation. Usually it is the max 
retry times plus 1.
    * Operation timeout and max attempt times(or max retry times) are both 
limitations for retrying,
    * we will stop retrying when we reach any of the limitations.
-   * @param maxAttempts
    * @return this for invocation chaining
    */
   AsyncAdminBuilder setMaxAttempts(int maxAttempts);
 
   /**
    * Set the number of retries that are allowed before we start to log.
-   * @param startLogErrorsCnt
    * @return this for invocation chaining
    */
   AsyncAdminBuilder setStartLogErrorsCnt(int startLogErrorsCnt);
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java
index 00896ef..ffb3ae9 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java
@@ -33,6 +33,8 @@ abstract class AsyncAdminBuilderBase implements 
AsyncAdminBuilder {
 
   protected long pauseNs;
 
+  protected long pauseForCQTBENs;
+
   protected int maxAttempts;
 
   protected int startLogErrorsCnt;
@@ -41,6 +43,7 @@ abstract class AsyncAdminBuilderBase implements 
AsyncAdminBuilder {
     this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
     this.operationTimeoutNs = connConf.getOperationTimeoutNs();
     this.pauseNs = connConf.getPauseNs();
+    this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
     this.maxAttempts = connConf.getMaxRetries();
     this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
   }
@@ -64,6 +67,12 @@ abstract class AsyncAdminBuilderBase implements 
AsyncAdminBuilder {
   }
 
   @Override
+  public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) {
+    this.pauseForCQTBENs = unit.toNanos(timeout);
+    return this;
+  }
+
+  @Override
   public AsyncAdminBuilder setMaxAttempts(int maxAttempts) {
     this.maxAttempts = maxAttempts;
     return this;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
index ce0fca7..7a381db 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
@@ -44,10 +44,10 @@ public class AsyncAdminRequestRetryingCaller<T> extends 
AsyncRpcRetryingCaller<T
   private ServerName serverName;
 
   public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl 
conn, int priority,
-      long pauseNs, int maxAttempts, long operationTimeoutNs, long 
rpcTimeoutNs,
-      int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
-    super(retryTimer, conn, priority, pauseNs, maxAttempts, 
operationTimeoutNs, rpcTimeoutNs,
-      startLogErrorsCnt);
+      long pauseNs, long pauseForCQTBENs, int maxAttempts, long 
operationTimeoutNs,
+      long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, 
Callable<T> callable) {
+    super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, 
operationTimeoutNs,
+      rpcTimeoutNs, startLogErrorsCnt);
     this.serverName = serverName;
     this.callable = callable;
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index e429422..464eff5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -45,6 +45,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -103,6 +104,8 @@ class AsyncBatchRpcRetryingCaller<T> {
 
   private final long pauseNs;
 
+  private final long pauseForCQTBENs;
+
   private final int maxAttempts;
 
   private final long operationTimeoutNs;
@@ -147,17 +150,17 @@ class AsyncBatchRpcRetryingCaller<T> {
   }
 
   public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl 
conn,
-      TableName tableName, List<? extends Row> actions, long pauseNs, int 
maxAttempts,
-      long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+      TableName tableName, List<? extends Row> actions, long pauseNs, long 
pauseForCQTBENs,
+      int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int 
startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.conn = conn;
     this.tableName = tableName;
     this.pauseNs = pauseNs;
+    this.pauseForCQTBENs = pauseForCQTBENs;
     this.maxAttempts = maxAttempts;
     this.operationTimeoutNs = operationTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
     this.startLogErrorsCnt = startLogErrorsCnt;
-
     this.actions = new ArrayList<>(actions.size());
     this.futures = new ArrayList<>(actions.size());
     this.action2Future = new IdentityHashMap<>(actions.size());
@@ -337,7 +340,7 @@ class AsyncBatchRpcRetryingCaller<T> {
       }
     });
     if (!failedActions.isEmpty()) {
-      tryResubmit(failedActions.stream(), tries, 
retryImmediately.booleanValue());
+      tryResubmit(failedActions.stream(), tries, 
retryImmediately.booleanValue(), false);
     }
   }
 
@@ -442,24 +445,27 @@ class AsyncBatchRpcRetryingCaller<T> {
     List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r 
-> r.actions.stream())
       .collect(Collectors.toList());
     addError(copiedActions, error, serverName);
-    tryResubmit(copiedActions.stream(), tries, error instanceof 
RetryImmediatelyException);
+    tryResubmit(copiedActions.stream(), tries, error instanceof 
RetryImmediatelyException,
+      error instanceof CallQueueTooBigException);
   }
 
-  private void tryResubmit(Stream<Action> actions, int tries, boolean 
immediately) {
+  private void tryResubmit(Stream<Action> actions, int tries, boolean 
immediately,
+      boolean isCallQueueTooBig) {
     if (immediately) {
       groupAndSend(actions, tries);
       return;
     }
     long delayNs;
+    long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
     if (operationTimeoutNs > 0) {
       long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
       if (maxDelayNs <= 0) {
         failAll(actions, tries);
         return;
       }
-      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
     } else {
-      delayNs = getPauseTime(pauseNs, tries - 1);
+      delayNs = getPauseTime(pauseNsToUse, tries - 1);
     }
     retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, 
TimeUnit.NANOSECONDS);
   }
@@ -498,7 +504,7 @@ class AsyncBatchRpcRetryingCaller<T> {
           sendOrDelay(actionsByServer, tries);
         }
         if (!locateFailed.isEmpty()) {
-          tryResubmit(locateFailed.stream(), tries, false);
+          tryResubmit(locateFailed.stream(), tries, false, false);
         }
       });
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index d6cca48..5fd00a5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -73,6 +73,8 @@ class AsyncClientScanner {
 
   private final long pauseNs;
 
+  private final long pauseForCQTBENs;
+
   private final int maxAttempts;
 
   private final long scanTimeoutNs;
@@ -84,8 +86,8 @@ class AsyncClientScanner {
   private final ScanResultCache resultCache;
 
   public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, 
TableName tableName,
-      AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, int 
maxAttempts, long scanTimeoutNs,
-      long rpcTimeoutNs, int startLogErrorsCnt) {
+      AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long 
pauseForCQTBENs,
+      int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int 
startLogErrorsCnt) {
     if (scan.getStartRow() == null) {
       scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
     }
@@ -98,6 +100,7 @@ class AsyncClientScanner {
     this.conn = conn;
     this.retryTimer = retryTimer;
     this.pauseNs = pauseNs;
+    this.pauseForCQTBENs = pauseForCQTBENs;
     this.maxAttempts = maxAttempts;
     this.scanTimeoutNs = scanTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
@@ -160,14 +163,16 @@ class AsyncClientScanner {
   }
 
   private void startScan(OpenScannerResponse resp) {
-    
addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
-      .location(resp.loc).remote(resp.isRegionServerRemote)
-      .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), 
TimeUnit.MILLISECONDS).stub(resp.stub)
-      
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
-      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-      .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, 
TimeUnit.NANOSECONDS)
-      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
-      .start(resp.controller, resp.resp), (hasMore, error) -> {
+    addListener(
+      
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
+        .remote(resp.isRegionServerRemote)
+        .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), 
TimeUnit.MILLISECONDS).stub(resp.stub)
+        
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
+        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+        .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, 
TimeUnit.NANOSECONDS)
+        .pauseForCQTBE(pauseForCQTBENs, 
TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+        .startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, 
resp.resp),
+      (hasMore, error) -> {
         if (error != null) {
           consumer.onError(error);
           return;
@@ -185,8 +190,8 @@ class AsyncClientScanner {
       
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
       .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
       .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, 
TimeUnit.NANOSECONDS)
-      
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
-      .call();
+      .pauseForCQTBE(pauseForCQTBENs, 
TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+      
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
   }
 
   private long getPrimaryTimeoutNs() {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 22042c9..6596578 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -30,6 +30,7 @@ import static 
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_
 import static 
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
 import static 
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
@@ -54,6 +55,8 @@ import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFE
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Timeout configs.
@@ -61,6 +64,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 class AsyncConnectionConfiguration {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncConnectionConfiguration.class);
+
   private final long metaOperationTimeoutNs;
 
   // timeout for a whole operation such as get, put or delete. Notice that 
scan will not be effected
@@ -79,6 +84,8 @@ class AsyncConnectionConfiguration {
 
   private final long pauseNs;
 
+  private final long pauseForCQTBENs;
+
   private final int maxRetries;
 
   /** How many retries are allowed before we start to log */
@@ -121,8 +128,16 @@ class AsyncConnectionConfiguration {
       TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, 
rpcTimeoutNs));
     this.writeRpcTimeoutNs =
       TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, 
rpcTimeoutNs));
-    this.pauseNs =
-      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, 
DEFAULT_HBASE_CLIENT_PAUSE));
+    long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, 
DEFAULT_HBASE_CLIENT_PAUSE);
+    long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs);
+    if (pauseForCQTBEMs < pauseMs) {
+      LOG.warn(
+        "The {} setting: {} ms is less than the {} setting: {} ms, use the 
greater one instead",
+        HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, 
pauseMs);
+      pauseForCQTBEMs = pauseMs;
+    }
+    this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
+    this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs);
     this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, 
DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     this.startLogErrorsCnt =
       conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 
DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
@@ -173,6 +188,10 @@ class AsyncConnectionConfiguration {
     return pauseNs;
   }
 
+  long getPauseForCQTBENs() {
+    return pauseForCQTBENs;
+  }
+
   int getMaxRetries() {
     return maxRetries;
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
index 5ba4dee..de2778c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
@@ -44,10 +44,10 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends 
AsyncRpcRetryingCall
   private final Callable<T> callable;
 
   public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, 
AsyncConnectionImpl conn,
-      Callable<T> callable, int priority, long pauseNs, int maxRetries, long 
operationTimeoutNs,
-      long rpcTimeoutNs, int startLogErrorsCnt) {
-    super(retryTimer, conn, priority, pauseNs, maxRetries, operationTimeoutNs, 
rpcTimeoutNs,
-      startLogErrorsCnt);
+      Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, 
int maxRetries,
+      long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+    super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries, 
operationTimeoutNs,
+      rpcTimeoutNs, startLogErrorsCnt);
     this.callable = callable;
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index 387b103..dcf7aa1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
@@ -59,6 +60,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
 
   private final long pauseNs;
 
+  private final long pauseForCQTBENs;
+
   private int tries = 1;
 
   private final int maxAttempts;
@@ -78,12 +81,13 @@ public abstract class AsyncRpcRetryingCaller<T> {
   protected final HBaseRpcController controller;
 
   public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 
int priority,
-      long pauseNs, int maxAttempts, long operationTimeoutNs, long 
rpcTimeoutNs,
-      int startLogErrorsCnt) {
+      long pauseNs, long pauseForCQTBENs, int maxAttempts, long 
operationTimeoutNs,
+      long rpcTimeoutNs, int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.conn = conn;
     this.priority = priority;
     this.pauseNs = pauseNs;
+    this.pauseForCQTBENs = pauseForCQTBENs;
     this.maxAttempts = maxAttempts;
     this.operationTimeoutNs = operationTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
@@ -123,6 +127,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
   }
 
   private void tryScheduleRetry(Throwable error, Consumer<Throwable> 
updateCachedLocation) {
+    long pauseNsToUse = error instanceof CallQueueTooBigException ? 
pauseForCQTBENs : pauseNs;
     long delayNs;
     if (operationTimeoutNs > 0) {
       long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
@@ -130,9 +135,9 @@ public abstract class AsyncRpcRetryingCaller<T> {
         completeExceptionally();
         return;
       }
-      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
     } else {
-      delayNs = getPauseTime(pauseNs, tries - 1);
+      delayNs = getPauseTime(pauseNsToUse, tries - 1);
     }
     tries++;
     retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 513f813..48bde44 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -58,6 +58,8 @@ class AsyncRpcRetryingCallerFactory {
 
     protected long pauseNs = conn.connConf.getPauseNs();
 
+    protected long pauseForCQTBENs = conn.connConf.getPauseForCQTBENs();
+
     protected int maxAttempts = 
retries2Attempts(conn.connConf.getMaxRetries());
 
     protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
@@ -117,6 +119,11 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public SingleRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit 
unit) {
+      this.pauseForCQTBENs = unit.toNanos(pause);
+      return this;
+    }
+
     public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
       this.maxAttempts = maxAttempts;
       return this;
@@ -149,8 +156,8 @@ class AsyncRpcRetryingCallerFactory {
     public AsyncSingleRequestRpcRetryingCaller<T> build() {
       preCheck();
       return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, 
tableName, row, replicaId,
-        locateType, callable, priority, pauseNs, maxAttempts, 
operationTimeoutNs, rpcTimeoutNs,
-        startLogErrorsCnt);
+        locateType, callable, priority, pauseNs, pauseForCQTBENs, maxAttempts, 
operationTimeoutNs,
+        rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -256,6 +263,11 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public ScanSingleRegionCallerBuilder pauseForCQTBE(long pause, TimeUnit 
unit) {
+      this.pauseForCQTBENs = unit.toNanos(pause);
+      return this;
+    }
+
     public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) {
       this.maxAttempts = maxAttempts;
       return this;
@@ -280,8 +292,8 @@ class AsyncRpcRetryingCallerFactory {
       preCheck();
       return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, 
scan, scanMetrics,
         scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, 
priority,
-        scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts, scanTimeoutNs, 
rpcTimeoutNs,
-        startLogErrorsCnt);
+        scannerLeaseTimeoutPeriodNs, pauseNs, pauseForCQTBENs, maxAttempts, 
scanTimeoutNs,
+        rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -335,6 +347,11 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public BatchCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) {
+      this.pauseForCQTBENs = unit.toNanos(pause);
+      return this;
+    }
+
     public BatchCallerBuilder maxAttempts(int maxAttempts) {
       this.maxAttempts = maxAttempts;
       return this;
@@ -347,7 +364,7 @@ class AsyncRpcRetryingCallerFactory {
 
     public <T> AsyncBatchRpcRetryingCaller<T> build() {
       return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, 
actions, pauseNs,
-        maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+        pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 
startLogErrorsCnt);
     }
 
     public <T> List<CompletableFuture<T>> call() {
@@ -389,6 +406,11 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public MasterRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit 
unit) {
+      this.pauseForCQTBENs = unit.toNanos(pause);
+      return this;
+    }
+
     public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
       this.maxAttempts = maxAttempts;
       return this;
@@ -416,7 +438,7 @@ class AsyncRpcRetryingCallerFactory {
     public AsyncMasterRequestRpcRetryingCaller<T> build() {
       preCheck();
       return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, 
callable, priority,
-        pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 
startLogErrorsCnt);
+        pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, 
rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -465,6 +487,11 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public AdminRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit 
unit) {
+      this.pauseForCQTBENs = unit.toNanos(pause);
+      return this;
+    }
+
     public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
       this.maxAttempts = maxAttempts;
       return this;
@@ -487,7 +514,7 @@ class AsyncRpcRetryingCallerFactory {
 
     public AsyncAdminRequestRetryingCaller<T> build() {
       return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, 
priority, pauseNs,
-        maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+        pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 
startLogErrorsCnt,
         checkNotNull(serverName, "serverName is null"), checkNotNull(callable, 
"action is null"));
     }
 
@@ -531,6 +558,11 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public ServerRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit 
unit) {
+      this.pauseForCQTBENs = unit.toNanos(pause);
+      return this;
+    }
+
     public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
       this.maxAttempts = maxAttempts;
       return this;
@@ -547,8 +579,8 @@ class AsyncRpcRetryingCallerFactory {
     }
 
     public AsyncServerRequestRpcRetryingCaller<T> build() {
-      return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, 
pauseNs, maxAttempts,
-        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+      return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, 
pauseNs, pauseForCQTBENs,
+        maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
         checkNotNull(serverName, "serverName is null"), checkNotNull(callable, 
"action is null"));
     }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index b87d170..1fa3c81 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
@@ -97,6 +98,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private final long pauseNs;
 
+  private final long pauseForCQTBENs;
+
   private final int maxAttempts;
 
   private final long scanTimeoutNs;
@@ -304,7 +307,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache 
resultCache,
       AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
       boolean isRegionServerRemote, int priority, long 
scannerLeaseTimeoutPeriodNs, long pauseNs,
-      int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int 
startLogErrorsCnt) {
+      long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long 
rpcTimeoutNs,
+      int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.scan = scan;
     this.scanMetrics = scanMetrics;
@@ -316,6 +320,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     this.regionServerRemote = isRegionServerRemote;
     this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
     this.pauseNs = pauseNs;
+    this.pauseForCQTBENs = pauseForCQTBENs;
     this.maxAttempts = maxAttempts;
     this.scanTimeoutNs = scanTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
@@ -405,15 +410,16 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       return;
     }
     long delayNs;
+    long pauseNsToUse = error instanceof CallQueueTooBigException ? 
pauseForCQTBENs : pauseNs;
     if (scanTimeoutNs > 0) {
       long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
       if (maxDelayNs <= 0) {
         completeExceptionally(!scannerClosed);
         return;
       }
-      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
     } else {
-      delayNs = getPauseTime(pauseNs, tries - 1);
+      delayNs = getPauseTime(pauseNsToUse, tries - 1);
     }
     if (scannerClosed) {
       completeWhenError(false);
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
index 63c85c2..52a2abe 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
@@ -46,10 +46,10 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends 
AsyncRpcRetryingCall
   private ServerName serverName;
 
   public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, 
AsyncConnectionImpl conn,
-      long pauseNs, int maxAttempts, long operationTimeoutNs, long 
rpcTimeoutNs,
-      int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
-    super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, maxAttempts, 
operationTimeoutNs,
-      rpcTimeoutNs, startLogErrorsCnt);
+      long pauseNs, long pauseForCQTBENs, int maxAttempts, long 
operationTimeoutNs,
+      long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, 
Callable<T> callable) {
+    super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseForCQTBENs, 
maxAttempts,
+      operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     this.serverName = serverName;
     this.callable = callable;
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 9b0dede..2a552c7 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -56,10 +56,10 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends 
AsyncRpcRetryingCaller<T> {
 
   public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, 
AsyncConnectionImpl conn,
       TableName tableName, byte[] row, int replicaId, RegionLocateType 
locateType,
-      Callable<T> callable, int priority, long pauseNs, int maxAttempts, long 
operationTimeoutNs,
-      long rpcTimeoutNs, int startLogErrorsCnt) {
-    super(retryTimer, conn, priority, pauseNs, maxAttempts, 
operationTimeoutNs, rpcTimeoutNs,
-      startLogErrorsCnt);
+      Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, 
int maxAttempts,
+      long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+    super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, 
operationTimeoutNs,
+      rpcTimeoutNs, startLogErrorsCnt);
     this.tableName = tableName;
     this.row = row;
     this.replicaId = replicaId;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
index 6632ad5..4c883a8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
@@ -76,10 +76,23 @@ public interface AsyncTableBuilder<C extends 
ScanResultConsumerBase> {
   /**
    * Set the base pause time for retrying. We use an exponential policy to 
generate sleep time when
    * retrying.
+   * @see #setRetryPauseForCQTBE(long, TimeUnit)
    */
   AsyncTableBuilder<C> setRetryPause(long pause, TimeUnit unit);
 
   /**
+   * Set the base pause time for retrying when we hit {@code 
CallQueueTooBigException}. We use an
+   * exponential policy to generate sleep time when retrying.
+   * <p/>
+   * This value should be greater than the normal pause value which could be 
set with the above
+   * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code 
CallQueueTooBigException}
+   * means the server is overloaded. We just use the normal pause value for
+   * {@code CallQueueTooBigException} if here you specify a smaller value.
+   * @see #setRetryPause(long, TimeUnit)
+   */
+  AsyncTableBuilder<C> setRetryPauseForCQTBE(long pause, TimeUnit unit);
+
+  /**
    * Set the max retry times for an operation. Usually it is the max attempt 
times minus 1.
    * <p>
    * Operation timeout and max attempt times(or max retry times) are both 
limitations for retrying,
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
index ee571f1..399d9dd 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
@@ -45,6 +45,8 @@ abstract class AsyncTableBuilderBase<C extends 
ScanResultConsumerBase>
 
   protected long pauseNs;
 
+  protected long pauseForCQTBENs;
+
   protected int maxAttempts;
 
   protected int startLogErrorsCnt;
@@ -58,6 +60,7 @@ abstract class AsyncTableBuilderBase<C extends 
ScanResultConsumerBase>
     this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
     this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
     this.pauseNs = connConf.getPauseNs();
+    this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
     this.maxAttempts = retries2Attempts(connConf.getMaxRetries());
     this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
   }
@@ -99,6 +102,12 @@ abstract class AsyncTableBuilderBase<C extends 
ScanResultConsumerBase>
   }
 
   @Override
+  public AsyncTableBuilderBase<C> setRetryPauseForCQTBE(long pause, TimeUnit 
unit) {
+    this.pauseForCQTBENs = unit.toNanos(pause);
+    return this;
+  }
+
+  @Override
   public AsyncTableBuilderBase<C> setMaxAttempts(int maxAttempts) {
     this.maxAttempts = maxAttempts;
     return this;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 35cb922..0fd3cba 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -327,6 +327,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private final long pauseNs;
 
+  private final long pauseForCQTBENs;
+
   private final int maxAttempts;
 
   private final int startLogErrorsCnt;
@@ -341,6 +343,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     this.rpcTimeoutNs = builder.rpcTimeoutNs;
     this.operationTimeoutNs = builder.operationTimeoutNs;
     this.pauseNs = builder.pauseNs;
+    if (builder.pauseForCQTBENs < builder.pauseNs) {
+      LOG.warn(
+        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
+          " the normal pause value {} ms, use the greater one instead",
+        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
+        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
+      this.pauseForCQTBENs = builder.pauseNs;
+    } else {
+      this.pauseForCQTBENs = builder.pauseForCQTBENs;
+    }
     this.maxAttempts = builder.maxAttempts;
     this.startLogErrorsCnt = builder.startLogErrorsCnt;
     this.ng = connection.getNonceGenerator();
@@ -348,18 +360,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
     return this.connection.callerFactory.<T> masterRequest()
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
-        .startLogErrorsCnt(startLogErrorsCnt);
+      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, 
TimeUnit.NANOSECONDS)
+      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
   }
 
   private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
     return this.connection.callerFactory.<T> adminRequest()
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
-        .startLogErrorsCnt(startLogErrorsCnt);
+      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, 
TimeUnit.NANOSECONDS)
+      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
   }
 
   @FunctionalInterface
@@ -3357,10 +3369,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private <T> ServerRequestCallerBuilder<T> newServerCaller() {
     return this.connection.callerFactory.<T> serverRequest()
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
-        .startLogErrorsCnt(startLogErrorsCnt);
+      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, 
TimeUnit.NANOSECONDS)
+      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
   }
 
   @Override
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index b2ca3a9..8050137 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
@@ -77,6 +79,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
 @InterfaceAudience.Private
 class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(RawAsyncTableImpl.class);
+
   private final AsyncConnectionImpl conn;
 
   private final Timer retryTimer;
@@ -99,6 +103,8 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
 
   private final long pauseNs;
 
+  private final long pauseForCQTBENs;
+
   private final int maxAttempts;
 
   private final int startLogErrorsCnt;
@@ -113,6 +119,16 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
     this.operationTimeoutNs = builder.operationTimeoutNs;
     this.scanTimeoutNs = builder.scanTimeoutNs;
     this.pauseNs = builder.pauseNs;
+    if (builder.pauseForCQTBENs < builder.pauseNs) {
+      LOG.warn(
+        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
+          " the normal pause value {} ms, use the greater one instead",
+        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
+        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
+      this.pauseForCQTBENs = builder.pauseNs;
+    } else {
+      this.pauseForCQTBENs = builder.pauseForCQTBENs;
+    }
     this.maxAttempts = builder.maxAttempts;
     this.startLogErrorsCnt = builder.startLogErrorsCnt;
     this.defaultScannerCaching = tableName.isSystemTable() ? 
conn.connConf.getMetaScannerCaching()
@@ -220,8 +236,8 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
     return conn.callerFactory.<T> 
single().table(tableName).row(row).priority(priority)
       .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
       .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-      .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
-      .startLogErrorsCnt(startLogErrorsCnt);
+      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, 
TimeUnit.NANOSECONDS)
+      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
   }
 
   private <T, R extends OperationWithAttributes & Row> 
SingleRequestCallerBuilder<T> newCaller(
@@ -451,7 +467,8 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
   @Override
   public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
     new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, 
conn, retryTimer,
-      pauseNs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, 
startLogErrorsCnt).start();
+      pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, 
startLogErrorsCnt)
+        .start();
   }
 
   private long resultSize2CacheSize(long maxResultSize) {
@@ -521,7 +538,8 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
     return conn.callerFactory.batch().table(tableName).actions(actions)
       .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
       .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, 
TimeUnit.NANOSECONDS)
-      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
+      .pauseForCQTBE(pauseForCQTBENs, 
TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+      .startLogErrorsCnt(startLogErrorsCnt).call();
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java
new file mode 100644
index 0000000..075e1bc
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.CallRunner;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
+import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import 
org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncClientPauseForCallQueueTooBig {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncClientPauseForCallQueueTooBig.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("CQTBE");
+
+  private static byte[] FAMILY = Bytes.toBytes("Family");
+
+  private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
+
+  private static long PAUSE_FOR_CQTBE_NS = TimeUnit.SECONDS.toNanos(1);
+
+  private static AsyncConnection CONN;
+
+  private static boolean FAIL = false;
+
+  private static ConcurrentMap<MethodDescriptor, AtomicInteger> INVOKED = new 
ConcurrentHashMap<>();
+
+  public static final class CQTBERpcScheduler extends SimpleRpcScheduler {
+
+    public CQTBERpcScheduler(Configuration conf, int handlerCount, int 
priorityHandlerCount,
+        int replicationHandlerCount, int metaTransitionHandler, 
PriorityFunction priority,
+        Abortable server, int highPriorityLevel) {
+      super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount,
+        metaTransitionHandler, priority, server, highPriorityLevel);
+    }
+
+    @Override
+    public boolean dispatch(CallRunner callTask) throws InterruptedException {
+      if (FAIL) {
+        MethodDescriptor method = callTask.getRpcCall().getMethod();
+        // this is for test scan, where we will send a open scanner first and 
then a next, and we
+        // expect that we hit CQTBE two times.
+        if (INVOKED.computeIfAbsent(method, k -> new 
AtomicInteger(0)).getAndIncrement() % 2 == 0) {
+          return false;
+        }
+      }
+      return super.dispatch(callTask);
+    }
+  }
+
+  public static final class CQTBERpcSchedulerFactory extends 
SimpleRpcSchedulerFactory {
+
+    @Override
+    public RpcScheduler create(Configuration conf, PriorityFunction priority, 
Abortable server) {
+      int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+      return new CQTBERpcScheduler(conf, handlerCount,
+        conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
+          HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
+        conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
+          HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
+        conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
+          HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
+        priority, server, HConstants.QOS_THRESHOLD);
+    }
+
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
+    UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
+      TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_CQTBE_NS));
+    
UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+      CQTBERpcSchedulerFactory.class, RpcSchedulerFactory.class);
+    UTIL.startMiniCluster(1);
+    CONN = 
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    Closeables.close(CONN, true);
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUpBeforeTest() throws IOException {
+    try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, 
Bytes.toBytes(i)));
+      }
+    }
+    FAIL = true;
+  }
+
+  @After
+  public void tearDownAfterTest() throws IOException {
+    FAIL = false;
+    INVOKED.clear();
+    UTIL.getAdmin().disableTable(TABLE_NAME);
+    UTIL.getAdmin().deleteTable(TABLE_NAME);
+  }
+
+  private void assertTime(Callable<Void> callable, long time) throws Exception 
{
+    long startNs = System.nanoTime();
+    callable.call();
+    long costNs = System.nanoTime() - startNs;
+    assertTrue(costNs > time);
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    assertTime(() -> {
+      Result result = CONN.getTable(TABLE_NAME).get(new 
Get(Bytes.toBytes(0))).get();
+      assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER));
+      return null;
+    }, PAUSE_FOR_CQTBE_NS);
+  }
+
+  @Test
+  public void testBatch() throws Exception {
+    assertTime(() -> {
+      List<CompletableFuture<?>> futures = new ArrayList<>();
+      try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) 
{
+        for (int i = 100; i < 110; i++) {
+          futures.add(mutator
+            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, 
Bytes.toBytes(i))));
+        }
+      }
+      return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).get();
+    }, PAUSE_FOR_CQTBE_NS);
+  }
+
+  @Test
+  public void testScan() throws Exception {
+    // we will hit CallQueueTooBigException two times so the sleep time should 
be twice
+    assertTime(() -> {
+      try (
+        ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new 
Scan().setCaching(80))) {
+        for (int i = 0; i < 100; i++) {
+          Result result = scanner.next();
+          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, 
QUALIFIER));
+        }
+        assertNull(scanner.next());
+      }
+      return null;
+    }, PAUSE_FOR_CQTBE_NS * 2);
+  }
+}

Reply via email to