This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 9361ae506a1 HBASE-28385 Improve scan quota estimates when using block 
bytes scanned (#5713)
9361ae506a1 is described below

commit 9361ae506a112ccd1525a03da866bf2286931df9
Author: Ray Mattingly <rmdmattin...@gmail.com>
AuthorDate: Wed Mar 13 17:58:54 2024 -0400

    HBASE-28385 Improve scan quota estimates when using block bytes scanned 
(#5713)
    
    Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org>
---
 .../hadoop/hbase/quotas/DefaultOperationQuota.java |  89 ++++++++++++--
 .../hadoop/hbase/quotas/ExceedOperationQuota.java  |  33 +++++-
 .../hadoop/hbase/quotas/NoopOperationQuota.java    |  10 +-
 .../hadoop/hbase/quotas/NoopQuotaLimiter.java      |   5 +
 .../apache/hadoop/hbase/quotas/OperationQuota.java |  20 +++-
 .../apache/hadoop/hbase/quotas/QuotaLimiter.java   |   3 +
 .../hbase/quotas/RegionServerRpcQuotaManager.java  |  80 ++++++++++---
 .../hadoop/hbase/quotas/TimeBasedLimiter.java      |   5 +
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  31 ++++-
 .../hbase/quotas/TestBlockBytesScannedQuota.java   |  71 ++++++++++--
 .../hbase/quotas/TestDefaultOperationQuota.java    | 128 +++++++++++++++++++++
 .../hadoop/hbase/quotas/ThrottleQuotaTestUtil.java |   7 +-
 12 files changed, 427 insertions(+), 55 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index a4ff8b2a859..2e26765a6a1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -27,10 +27,17 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class DefaultOperationQuota implements OperationQuota {
 
+  // a single scan estimate can consume no more than this proportion of the 
limiter's limit
+  // this prevents a long-running scan from being estimated at, say, 100MB of 
IO against
+  // a <100MB/IO throttle (because this would never succeed)
+  private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION 
= 0.9;
+
   protected final List<QuotaLimiter> limiters;
   private final long writeCapacityUnit;
   private final long readCapacityUnit;
@@ -53,6 +60,7 @@ public class DefaultOperationQuota implements OperationQuota {
   protected long readCapacityUnitDiff = 0;
   private boolean useResultSizeBytes;
   private long blockSizeBytes;
+  private long maxScanEstimate;
 
   public DefaultOperationQuota(final Configuration conf, final int 
blockSizeBytes,
     final QuotaLimiter... limiters) {
@@ -60,6 +68,9 @@ public class DefaultOperationQuota implements OperationQuota {
     this.useResultSizeBytes =
       conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, 
USE_RESULT_SIZE_BYTES_DEFAULT);
     this.blockSizeBytes = blockSizeBytes;
+    long readSizeLimit =
+      
Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE);
+    maxScanEstimate = 
Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit);
   }
 
   /**
@@ -80,21 +91,34 @@ public class DefaultOperationQuota implements 
OperationQuota {
   }
 
   @Override
-  public void checkQuota(int numWrites, int numReads, int numScans) throws 
RpcThrottlingException {
-    updateEstimateConsumeQuota(numWrites, numReads, numScans);
+  public void checkBatchQuota(int numWrites, int numReads) throws 
RpcThrottlingException {
+    updateEstimateConsumeBatchQuota(numWrites, numReads);
+    checkQuota(numWrites, numReads);
+  }
+
+  @Override
+  public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long 
maxScannerResultSize,
+    long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws 
RpcThrottlingException {
+    updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, 
maxBlockBytesScanned,
+      prevBlockBytesScannedDifference);
+    checkQuota(0, 1);
+  }
 
+  private void checkQuota(long numWrites, long numReads) throws 
RpcThrottlingException {
     readAvailable = Long.MAX_VALUE;
     for (final QuotaLimiter limiter : limiters) {
-      if (limiter.isBypass()) continue;
+      if (limiter.isBypass()) {
+        continue;
+      }
 
-      limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, 
readConsumed,
+      limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed,
         writeCapacityUnitConsumed, readCapacityUnitConsumed);
       readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
     }
 
     for (final QuotaLimiter limiter : limiters) {
-      limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, 
readConsumed,
-        writeCapacityUnitConsumed, readCapacityUnitConsumed);
+      limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, 
writeCapacityUnitConsumed,
+        readCapacityUnitConsumed);
     }
   }
 
@@ -158,24 +182,69 @@ public class DefaultOperationQuota implements 
OperationQuota {
    * Update estimate quota(read/write size/capacityUnits) which will be 
consumed
    * @param numWrites the number of write requests
    * @param numReads  the number of read requests
-   * @param numScans  the number of scan requests
    */
-  protected void updateEstimateConsumeQuota(int numWrites, int numReads, int 
numScans) {
+  protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) {
     writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
 
     if (useResultSizeBytes) {
       readConsumed = estimateConsume(OperationType.GET, numReads, 100);
-      readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
     } else {
       // assume 1 block required for reads. this is probably a low estimate, 
which is okay
       readConsumed = numReads > 0 ? blockSizeBytes : 0;
-      readConsumed += numScans > 0 ? blockSizeBytes : 0;
     }
 
     writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
     readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
   }
 
+  /**
+   * Update estimate quota(read/write size/capacityUnits) which will be 
consumed
+   * @param scanRequest                     the scan to be executed
+   * @param maxScannerResultSize            the maximum bytes to be returned 
by the scanner
+   * @param maxBlockBytesScanned            the maximum bytes scanned in a 
single RPC call by the
+   *                                        scanner
+   * @param prevBlockBytesScannedDifference the difference between BBS of the 
previous two next
+   *                                        calls
+   */
+  protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest 
scanRequest,
+    long maxScannerResultSize, long maxBlockBytesScanned, long 
prevBlockBytesScannedDifference) {
+    if (useResultSizeBytes) {
+      readConsumed = estimateConsume(OperationType.SCAN, 1, 1000);
+    } else {
+      long estimate = getScanReadConsumeEstimate(blockSizeBytes, 
scanRequest.getNextCallSeq(),
+        maxScannerResultSize, maxBlockBytesScanned, 
prevBlockBytesScannedDifference);
+      readConsumed = Math.min(maxScanEstimate, estimate);
+    }
+
+    readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
+  }
+
+  protected static long getScanReadConsumeEstimate(long blockSizeBytes, long 
nextCallSeq,
+    long maxScannerResultSize, long maxBlockBytesScanned, long 
prevBlockBytesScannedDifference) {
+    /*
+     * Estimating scan workload is more complicated, and if we severely 
underestimate workloads then
+     * throttled clients will exhaust retries too quickly, and could saturate 
the RPC layer
+     */
+    if (nextCallSeq == 0) {
+      // start scanners with an optimistic 1 block IO estimate
+      // it is better to underestimate a large scan in the beginning
+      // than to overestimate, and block, a small scan
+      return blockSizeBytes;
+    }
+
+    boolean isWorkloadGrowing = prevBlockBytesScannedDifference > 
blockSizeBytes;
+    if (isWorkloadGrowing) {
+      // if nextCallSeq > 0 and the workload is growing then our estimate
+      // should consider that the workload may continue to increase
+      return Math.min(maxScannerResultSize, nextCallSeq * 
maxBlockBytesScanned);
+    } else {
+      // if nextCallSeq > 0 and the workload is shrinking or flat
+      // then our workload has likely plateaued. We can just rely on the 
existing
+      // maxBlockBytesScanned as our estimate in this case.
+      return maxBlockBytesScanned;
+    }
+  }
+
   private long estimateConsume(final OperationType type, int numReqs, long 
avgSize) {
     if (numReqs > 0) {
       return avgSize * numReqs;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
index 1788e550f22..3077d6dac53 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
@@ -23,6 +23,8 @@ import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
 /*
  * Internal class used to check and consume quota if exceed throttle quota is 
enabled. Exceed
  * throttle quota means, user can over consume user/namespace/table quota if 
region server has
@@ -47,15 +49,32 @@ public class ExceedOperationQuota extends 
DefaultOperationQuota {
   }
 
   @Override
-  public void checkQuota(int numWrites, int numReads, int numScans) throws 
RpcThrottlingException {
+  public void checkBatchQuota(int numWrites, int numReads) throws 
RpcThrottlingException {
+    Runnable estimateQuota = () -> updateEstimateConsumeBatchQuota(numWrites, 
numReads);
+    CheckQuotaRunnable checkQuota = () -> super.checkBatchQuota(numWrites, 
numReads);
+    checkQuota(estimateQuota, checkQuota, numWrites, numReads, 0);
+  }
+
+  @Override
+  public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long 
maxScannerResultSize,
+    long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws 
RpcThrottlingException {
+    Runnable estimateQuota = () -> updateEstimateConsumeScanQuota(scanRequest, 
maxScannerResultSize,
+      maxBlockBytesScanned, prevBlockBytesScannedDifference);
+    CheckQuotaRunnable checkQuota = () -> super.checkScanQuota(scanRequest, 
maxScannerResultSize,
+      maxBlockBytesScanned, prevBlockBytesScannedDifference);
+    checkQuota(estimateQuota, checkQuota, 0, 0, 1);
+  }
+
+  private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable 
checkQuota, int numWrites,
+    int numReads, int numScans) throws RpcThrottlingException {
     if (regionServerLimiter.isBypass()) {
       // If region server limiter is bypass, which means no region server 
quota is set, check and
       // throttle by all other quotas. In this condition, exceed throttle 
quota will not work.
       LOG.warn("Exceed throttle quota is enabled but no region server quotas 
found");
-      super.checkQuota(numWrites, numReads, numScans);
+      checkQuota.run();
     } else {
       // 1. Update estimate quota which will be consumed
-      updateEstimateConsumeQuota(numWrites, numReads, numScans);
+      estimateQuota.run();
       // 2. Check if region server limiter is enough. If not, throw 
RpcThrottlingException.
       regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + 
numScans, readConsumed,
         writeCapacityUnitConsumed, readCapacityUnitConsumed);
@@ -63,11 +82,11 @@ public class ExceedOperationQuota extends 
DefaultOperationQuota {
       // limiter is enough.
       boolean exceed = false;
       try {
-        super.checkQuota(numWrites, numReads, numScans);
+        checkQuota.run();
       } catch (RpcThrottlingException e) {
         exceed = true;
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{} 
scan:{}, "
+          LOG.debug("Read/Write requests num exceeds quota: writes:{} 
reads:{}, scans:{}, "
             + "try use region server quota", numWrites, numReads, numScans);
         }
       }
@@ -96,4 +115,8 @@ public class ExceedOperationQuota extends 
DefaultOperationQuota {
       regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff);
     }
   }
+
+  private interface CheckQuotaRunnable {
+    void run() throws RpcThrottlingException;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
index b64429d9adc..736560e6fd1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
 /**
  * Noop operation quota returned when no quota is associated to the user/table
  */
@@ -40,7 +42,13 @@ class NoopOperationQuota implements OperationQuota {
   }
 
   @Override
-  public void checkQuota(int numWrites, int numReads, int numScans) throws 
RpcThrottlingException {
+  public void checkBatchQuota(int numWrites, int numReads) throws 
RpcThrottlingException {
+    // no-op
+  }
+
+  @Override
+  public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long 
maxScannerResultSize,
+    long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws 
RpcThrottlingException {
     // no-op
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
index 63d7610115a..cf1e49c12e5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -70,6 +70,11 @@ class NoopQuotaLimiter implements QuotaLimiter {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public long getReadLimit() {
+    return Long.MAX_VALUE;
+  }
+
   @Override
   public String toString() {
     return "NoopQuotaLimiter";
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index bedad5e9867..ef0a35fa589 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
 /**
  * Interface that allows to check the quota available for an operation.
  */
@@ -51,11 +53,25 @@ public interface OperationQuota {
    * on the number of operations to perform and the average size accumulated 
during time.
    * @param numWrites number of write operation that will be performed
    * @param numReads  number of small-read operation that will be performed
-   * @param numScans  number of long-read operation that will be performed
    * @throws RpcThrottlingException if the operation cannot be performed 
because RPC quota is
    *                                exceeded.
    */
-  void checkQuota(int numWrites, int numReads, int numScans) throws 
RpcThrottlingException;
+  void checkBatchQuota(int numWrites, int numReads) throws 
RpcThrottlingException;
+
+  /**
+   * Checks if it is possible to execute the scan. The quota will be estimated 
based on the
+   * composition of the scan.
+   * @param scanRequest                     the given scan operation
+   * @param maxScannerResultSize            the maximum bytes to be returned 
by the scanner
+   * @param maxBlockBytesScanned            the maximum bytes scanned in a 
single RPC call by the
+   *                                        scanner
+   * @param prevBlockBytesScannedDifference the difference between BBS of the 
previous two next
+   *                                        calls
+   * @throws RpcThrottlingException if the operation cannot be performed 
because RPC quota is
+   *                                exceeded.
+   */
+  void checkScanQuota(ClientProtos.ScanRequest scanRequest, long 
maxScannerResultSize,
+    long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws 
RpcThrottlingException;
 
   /** Cleanup method on operation completion */
   void close();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
index 14326e4e0d2..8d00a702e25 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -76,6 +76,9 @@ public interface QuotaLimiter {
   /** Returns the number of bytes available to read to avoid exceeding the 
quota */
   long getReadAvailable();
 
+  /** Returns the maximum number of bytes ever available to read */
+  long getReadLimit();
+
   /** Returns the number of bytes available to write to avoid exceeding the 
quota */
   long getWriteAvailable();
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
index 3c72c662887..92a0cfd5c13 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -156,38 +156,82 @@ public class RegionServerRpcQuotaManager {
 
   /**
    * Check the quota for the current (rpc-context) user. Returns the 
OperationQuota used to get the
-   * available quota and to report the data/usage of the operation.
+   * available quota and to report the data/usage of the operation. This 
method is specific to scans
+   * because estimating a scan's workload is more complicated than estimating 
the workload of a
+   * get/put.
+   * @param region                          the region where the operation 
will be performed
+   * @param scanRequest                     the scan to be estimated against 
the quota
+   * @param maxScannerResultSize            the maximum bytes to be returned 
by the scanner
+   * @param maxBlockBytesScanned            the maximum bytes scanned in a 
single RPC call by the
+   *                                        scanner
+   * @param prevBlockBytesScannedDifference the difference between BBS of the 
previous two next
+   *                                        calls
+   * @return the OperationQuota
+   * @throws RpcThrottlingException if the operation cannot be executed due to 
quota exceeded.
+   */
+  public OperationQuota checkScanQuota(final Region region,
+    final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
+    long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
+    throws IOException, RpcThrottlingException {
+    Optional<User> user = RpcServer.getRequestUser();
+    UserGroupInformation ugi;
+    if (user.isPresent()) {
+      ugi = user.get().getUGI();
+    } else {
+      ugi = User.getCurrent().getUGI();
+    }
+    TableDescriptor tableDescriptor = region.getTableDescriptor();
+    TableName table = tableDescriptor.getTableName();
+
+    OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
+    try {
+      quota.checkScanQuota(scanRequest, maxScannerResultSize, 
maxBlockBytesScanned,
+        prevBlockBytesScannedDifference);
+    } catch (RpcThrottlingException e) {
+      LOG.debug("Throttling exception for user=" + ugi.getUserName() + " 
table=" + table + " scan="
+        + scanRequest.getScannerId() + ": " + e.getMessage());
+      throw e;
+    }
+    return quota;
+  }
+
+  /**
+   * Check the quota for the current (rpc-context) user. Returns the 
OperationQuota used to get the
+   * available quota and to report the data/usage of the operation. This 
method does not support
+   * scans because estimating a scan's workload is more complicated than 
estimating the workload of
+   * a get/put.
    * @param region the region where the operation will be performed
    * @param type   the operation type
    * @return the OperationQuota
    * @throws RpcThrottlingException if the operation cannot be executed due to 
quota exceeded.
    */
-  public OperationQuota checkQuota(final Region region, final 
OperationQuota.OperationType type)
-    throws IOException, RpcThrottlingException {
+  public OperationQuota checkBatchQuota(final Region region,
+    final OperationQuota.OperationType type) throws IOException, 
RpcThrottlingException {
     switch (type) {
-      case SCAN:
-        return checkQuota(region, 0, 0, 1);
       case GET:
-        return checkQuota(region, 0, 1, 0);
+        return this.checkBatchQuota(region, 0, 1);
       case MUTATE:
-        return checkQuota(region, 1, 0, 0);
+        return this.checkBatchQuota(region, 1, 0);
       case CHECK_AND_MUTATE:
-        return checkQuota(region, 1, 1, 0);
+        return this.checkBatchQuota(region, 1, 1);
     }
     throw new RuntimeException("Invalid operation type: " + type);
   }
 
   /**
    * Check the quota for the current (rpc-context) user. Returns the 
OperationQuota used to get the
-   * available quota and to report the data/usage of the operation.
+   * available quota and to report the data/usage of the operation. This 
method does not support
+   * scans because estimating a scan's workload is more complicated than 
estimating the workload of
+   * a get/put.
    * @param region       the region where the operation will be performed
    * @param actions      the "multi" actions to perform
    * @param hasCondition whether the RegionAction has a condition
    * @return the OperationQuota
    * @throws RpcThrottlingException if the operation cannot be executed due to 
quota exceeded.
    */
-  public OperationQuota checkQuota(final Region region, final 
List<ClientProtos.Action> actions,
-    boolean hasCondition) throws IOException, RpcThrottlingException {
+  public OperationQuota checkBatchQuota(final Region region,
+    final List<ClientProtos.Action> actions, boolean hasCondition)
+    throws IOException, RpcThrottlingException {
     int numWrites = 0;
     int numReads = 0;
     for (final ClientProtos.Action action : actions) {
@@ -202,7 +246,7 @@ public class RegionServerRpcQuotaManager {
         numReads++;
       }
     }
-    return checkQuota(region, numWrites, numReads, 0);
+    return checkBatchQuota(region, numWrites, numReads);
   }
 
   /**
@@ -211,12 +255,11 @@ public class RegionServerRpcQuotaManager {
    * @param region    the region where the operation will be performed
    * @param numWrites number of writes to perform
    * @param numReads  number of short-reads to perform
-   * @param numScans  number of scan to perform
    * @return the OperationQuota
    * @throws RpcThrottlingException if the operation cannot be executed due to 
quota exceeded.
    */
-  private OperationQuota checkQuota(final Region region, final int numWrites, 
final int numReads,
-    final int numScans) throws IOException, RpcThrottlingException {
+  private OperationQuota checkBatchQuota(final Region region, final int 
numWrites,
+    final int numReads) throws IOException, RpcThrottlingException {
     Optional<User> user = RpcServer.getRequestUser();
     UserGroupInformation ugi;
     if (user.isPresent()) {
@@ -229,11 +272,10 @@ public class RegionServerRpcQuotaManager {
 
     OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
     try {
-      quota.checkQuota(numWrites, numReads, numScans);
+      quota.checkBatchQuota(numWrites, numReads);
     } catch (RpcThrottlingException e) {
-      LOG.debug(
-        "Throttling exception for user=" + ugi.getUserName() + " table=" + 
table + " numWrites="
-          + numWrites + " numReads=" + numReads + " numScans=" + numScans + ": 
" + e.getMessage());
+      LOG.debug("Throttling exception for user=" + ugi.getUserName() + " 
table=" + table
+        + " numWrites=" + numWrites + " numReads=" + numReads + ": " + 
e.getMessage());
       throw e;
     }
     return quota;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index 8ae2cae0188..483edbcd3a4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -243,6 +243,11 @@ public class TimeBasedLimiter implements QuotaLimiter {
     return readSizeLimiter.getAvailable();
   }
 
+  @Override
+  public long getReadLimit() {
+    return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7043b78c048..a2b9a93263d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -429,6 +429,9 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
     private boolean fullRegionScan;
     private final String clientIPAndPort;
     private final String userName;
+    private volatile long maxBlockBytesScanned = 0;
+    private volatile long prevBlockBytesScanned = 0;
+    private volatile long prevBlockBytesScannedDifference = 0;
 
     RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack,
       RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan,
@@ -452,6 +455,22 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
       return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
     }
 
+    long getMaxBlockBytesScanned() {
+      return maxBlockBytesScanned;
+    }
+
+    long getPrevBlockBytesScannedDifference() {
+      return prevBlockBytesScannedDifference;
+    }
+
+    void updateBlockBytesScanned(long blockBytesScanned) {
+      prevBlockBytesScannedDifference = blockBytesScanned - 
prevBlockBytesScanned;
+      prevBlockBytesScanned = blockBytesScanned;
+      if (blockBytesScanned > maxBlockBytesScanned) {
+        maxBlockBytesScanned = blockBytesScanned;
+      }
+    }
+
     // Should be called only when we need to print lease expired messages 
otherwise
     // cache the String once made.
     @Override
@@ -2466,7 +2485,7 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
       }
       Boolean existence = null;
       Result r = null;
-      quota = getRpcQuotaManager().checkQuota(region, 
OperationQuota.OperationType.GET);
+      quota = getRpcQuotaManager().checkBatchQuota(region, 
OperationQuota.OperationType.GET);
 
       Get clientGet = ProtobufUtil.toGet(get);
       if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
@@ -2683,7 +2702,7 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
 
       try {
         region = getRegion(regionSpecifier);
-        quota = getRpcQuotaManager().checkQuota(region, 
regionAction.getActionList(),
+        quota = getRpcQuotaManager().checkBatchQuota(region, 
regionAction.getActionList(),
           regionAction.hasCondition());
       } catch (IOException e) {
         failRegionAction(responseBuilder, regionActionResultBuilder, 
regionAction, cellScanner, e);
@@ -2746,7 +2765,7 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
 
       try {
         region = getRegion(regionSpecifier);
-        quota = getRpcQuotaManager().checkQuota(region, 
regionAction.getActionList(),
+        quota = getRpcQuotaManager().checkBatchQuota(region, 
regionAction.getActionList(),
           regionAction.hasCondition());
       } catch (IOException e) {
         failRegionAction(responseBuilder, regionActionResultBuilder, 
regionAction, cellScanner, e);
@@ -2931,7 +2950,7 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
       }
       long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : 
HConstants.NO_NONCE;
       OperationQuota.OperationType operationType = 
QuotaUtil.getQuotaOperationType(request);
-      quota = getRpcQuotaManager().checkQuota(region, operationType);
+      quota = getRpcQuotaManager().checkBatchQuota(region, operationType);
       ActivePolicyEnforcement spaceQuotaEnforcement =
         getSpaceQuotaManager().getActiveEnforcements();
 
@@ -3487,6 +3506,7 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
       if (rpcCall != null) {
         responseCellSize = rpcCall.getResponseCellSize();
         blockBytesScanned = rpcCall.getBlockBytesScanned();
+        rsh.updateBlockBytesScanned(blockBytesScanned);
       }
       region.getMetrics().updateScan();
       final MetricsRegionServer metricsRegionServer = server.getMetrics();
@@ -3590,7 +3610,8 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
     }
     OperationQuota quota;
     try {
-      quota = getRpcQuotaManager().checkQuota(region, 
OperationQuota.OperationType.SCAN);
+      quota = getRpcQuotaManager().checkScanQuota(region, request, 
maxScannerResultSize,
+        rsh.getMaxBlockBytesScanned(), 
rsh.getPrevBlockBytesScannedDifference());
     } catch (IOException e) {
       addScannerLeaseBack(lease);
       throw new ServiceException(e);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
index 5de9a2d1a90..c058abe214c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
@@ -23,6 +23,7 @@ import static 
org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
 import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans;
 import static 
org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh;
 import static 
org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota;
+import static org.junit.Assert.assertTrue;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -60,12 +61,17 @@ public class TestBlockBytesScannedQuota {
   private static final byte[] QUALIFIER = Bytes.toBytes("q");
 
   private static final TableName TABLE_NAME = 
TableName.valueOf("BlockBytesScannedQuotaTest");
+  private static final long MAX_SCANNER_RESULT_SIZE = 100 * 1024 * 1024;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     // client should fail fast
     TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10);
     
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    
TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
+      MAX_SCANNER_RESULT_SIZE);
+    
TEST_UTIL.getConfiguration().setClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY,
+      AverageIntervalRateLimiter.class, RateLimiter.class);
 
     // quotas enabled, using block bytes scanned
     TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
@@ -140,27 +146,75 @@ public class TestBlockBytesScannedQuota {
     waitMinuteQuota();
 
     // should execute 1 request
-    testTraffic(() -> doScans(5, table), 1, 0);
+    testTraffic(() -> doScans(5, table, 1), 1, 0);
 
     // Remove all the limits
     admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
     triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
-    testTraffic(() -> doScans(100, table), 100, 0);
-    testTraffic(() -> doScans(100, table), 100, 0);
+    testTraffic(() -> doScans(100, table, 1), 100, 0);
+    testTraffic(() -> doScans(100, table, 1), 100, 0);
 
     // Add ~3 block/sec limit. This should support >1 scans
     admin.setQuota(QuotaSettingsFactory.throttleUser(userName, 
ThrottleType.REQUEST_SIZE,
       Math.round(3.1 * blockSize), TimeUnit.SECONDS));
     triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+    waitMinuteQuota();
+
+    // Add 50 block/sec limit. This should support >1 scans
+    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, 
ThrottleType.REQUEST_SIZE,
+      Math.round(50.1 * blockSize), TimeUnit.SECONDS));
+    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+    waitMinuteQuota();
+
+    // This will produce some throttling exceptions, but all/most should 
succeed within the timeout
+    testTraffic(() -> doScans(100, table, 1), 75, 25);
+    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+    waitMinuteQuota();
+
+    // With large caching, a big scan should succeed
+    testTraffic(() -> doScans(10_000, table, 10_000), 10_000, 0);
+    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+    waitMinuteQuota();
+
+    // Remove all the limits
+    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
+    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+    testTraffic(() -> doScans(100, table, 1), 100, 0);
+    testTraffic(() -> doScans(100, table, 1), 100, 0);
+  }
+
+  @Test
+  public void testSmallScanNeverBlockedByLargeEstimate() throws Exception {
+    final Admin admin = TEST_UTIL.getAdmin();
+    final String userName = User.getCurrent().getShortName();
+    Table table = admin.getConnection().getTable(TABLE_NAME);
 
-    // should execute some requests, but not all
-    testTraffic(() -> doScans(100, table), 100, 90);
+    doPuts(10_000, FAMILY, QUALIFIER, table);
+    TEST_UTIL.flush(TABLE_NAME);
+
+    // Add 99MB/sec limit.
+    // This should never be blocked, but with a sequence number approaching 
10k, without
+    // other intervention, we would estimate a scan workload approaching 625MB 
or the
+    // maxScannerResultSize (both larger than the 90MB limit). This test 
ensures that all
+    // requests succeed, so the estimate never becomes large enough to cause 
read downtime
+    long limit = 99 * 1024 * 1024;
+    assertTrue(limit <= MAX_SCANNER_RESULT_SIZE); // always true, but 
protecting against code
+                                                  // changes
+    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, 
ThrottleType.REQUEST_SIZE, limit,
+      TimeUnit.SECONDS));
+    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+    waitMinuteQuota();
+
+    // should execute all requests
+    testTraffic(() -> doScans(10_000, table, 1), 10_000, 0);
+    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+    waitMinuteQuota();
 
     // Remove all the limits
     admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
     triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
-    testTraffic(() -> doScans(100, table), 100, 0);
-    testTraffic(() -> doScans(100, table), 100, 0);
+    testTraffic(() -> doScans(100, table, 1), 100, 0);
+    testTraffic(() -> doScans(100, table, 1), 100, 0);
   }
 
   @Test
@@ -223,9 +277,8 @@ public class TestBlockBytesScannedQuota {
       boolean success = (actualSuccess >= expectedSuccess - marginOfError)
         && (actualSuccess <= expectedSuccess + marginOfError);
       if (!success) {
-        triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+        triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
         waitMinuteQuota();
-        Thread.sleep(15_000L);
       }
       return success;
     });
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
new file mode 100644
index 00000000000..4684be02d69
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestDefaultOperationQuota {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestDefaultOperationQuota.class);
+
+  @Test
+  public void testScanEstimateNewScanner() {
+    long blockSize = 64 * 1024;
+    long nextCallSeq = 0;
+    long maxScannerResultSize = 100 * 1024 * 1024;
+    long maxBlockBytesScanned = 0;
+    long prevBBSDifference = 0;
+    long estimate = 
DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
+      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
+
+    // new scanner should estimate scan read as 1 block
+    assertEquals(blockSize, estimate);
+  }
+
+  @Test
+  public void testScanEstimateSecondNextCall() {
+    long blockSize = 64 * 1024;
+    long nextCallSeq = 1;
+    long maxScannerResultSize = 100 * 1024 * 1024;
+    long maxBlockBytesScanned = 10 * blockSize;
+    long prevBBSDifference = 10 * blockSize;
+    long estimate = 
DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
+      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
+
+    // 2nd next call should be estimated at maxBBS
+    assertEquals(maxBlockBytesScanned, estimate);
+  }
+
+  @Test
+  public void testScanEstimateFlatWorkload() {
+    long blockSize = 64 * 1024;
+    long nextCallSeq = 100;
+    long maxScannerResultSize = 100 * 1024 * 1024;
+    long maxBlockBytesScanned = 10 * blockSize;
+    long prevBBSDifference = 0;
+    long estimate = 
DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
+      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
+
+    // flat workload should not overestimate
+    assertEquals(maxBlockBytesScanned, estimate);
+  }
+
+  @Test
+  public void testScanEstimateVariableFlatWorkload() {
+    long blockSize = 64 * 1024;
+    long nextCallSeq = 1;
+    long maxScannerResultSize = 100 * 1024 * 1024;
+    long maxBlockBytesScanned = 10 * blockSize;
+    long prevBBSDifference = 0;
+    for (int i = 0; i < 100; i++) {
+      long variation = Math.round(Math.random() * blockSize);
+      if (variation % 2 == 0) {
+        variation *= -1;
+      }
+      // despite +/- <1 block variation, we consider this workload flat
+      prevBBSDifference = variation;
+
+      long estimate = 
DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq + i,
+        maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
+
+      // flat workload should not overestimate
+      assertEquals(maxBlockBytesScanned, estimate);
+    }
+  }
+
+  @Test
+  public void testScanEstimateGrowingWorkload() {
+    long blockSize = 64 * 1024;
+    long nextCallSeq = 100;
+    long maxScannerResultSize = 100 * 1024 * 1024;
+    long maxBlockBytesScanned = 20 * blockSize;
+    long prevBBSDifference = 10 * blockSize;
+    long estimate = 
DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
+      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
+
+    // growing workload should overestimate
+    assertTrue(nextCallSeq * maxBlockBytesScanned == estimate || 
maxScannerResultSize == estimate);
+  }
+
+  @Test
+  public void testScanEstimateShrinkingWorkload() {
+    long blockSize = 64 * 1024;
+    long nextCallSeq = 100;
+    long maxScannerResultSize = 100 * 1024 * 1024;
+    long maxBlockBytesScanned = 20 * blockSize;
+    long prevBBSDifference = -10 * blockSize;
+    long estimate = 
DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
+      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
+
+    // shrinking workload should only shrink estimate to maxBBS
+    assertEquals(maxBlockBytesScanned, estimate);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java
index ff34c52386b..8da2989921a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java
@@ -152,22 +152,21 @@ public final class ThrottleQuotaTestUtil {
     return opCount;
   }
 
-  static long doScans(int maxOps, Table table) {
+  static long doScans(int desiredRows, Table table, int caching) {
     int count = 0;
-    int caching = 100;
     try {
       Scan scan = new Scan();
       scan.setCaching(caching);
       scan.setCacheBlocks(false);
       ResultScanner scanner = table.getScanner(scan);
-      while (count < (maxOps * caching)) {
+      while (count < desiredRows) {
         scanner.next();
         count += 1;
       }
     } catch (IOException e) {
       LOG.error("scan failed after nRetries=" + count, e);
     }
-    return count / caching;
+    return count;
   }
 
   static void triggerUserCacheRefresh(HBaseTestingUtil testUtil, boolean 
bypass,


Reply via email to