This is an automated email from the ASF dual-hosted git repository.
rmattingly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new d82a5913955 HBASE-29231 Throttles should support limits based on
handler thread usage time (#7000)
d82a5913955 is described below
commit d82a5913955dd3856892b164b40a59c851238641
Author: Alex Hughes <[email protected]>
AuthorDate: Tue Jun 3 22:04:42 2025 +0200
HBASE-29231 Throttles should support limits based on handler thread usage
time (#7000)
Co-authored-by: Alex Hughes <[email protected]>
Signed-off-by: Ray Mattingly <[email protected]>
---
.../hadoop/hbase/quotas/QuotaSettingsFactory.java | 4 +
.../hbase/quotas/RpcThrottlingException.java | 9 +-
.../hadoop/hbase/quotas/ThrottleSettings.java | 3 +
.../apache/hadoop/hbase/quotas/ThrottleType.java | 3 +
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 4 +
.../src/main/protobuf/server/Quota.proto | 3 +
.../hadoop/hbase/quotas/DefaultOperationQuota.java | 56 +++++++--
.../hadoop/hbase/quotas/ExceedOperationQuota.java | 14 ++-
.../hbase/quotas/GlobalQuotaSettingsImpl.java | 15 +++
.../hadoop/hbase/quotas/NoopQuotaLimiter.java | 10 +-
.../apache/hadoop/hbase/quotas/QuotaLimiter.java | 59 ++++++----
.../org/apache/hadoop/hbase/quotas/QuotaUtil.java | 4 +
.../hbase/quotas/RegionServerRpcQuotaManager.java | 15 ++-
.../hadoop/hbase/quotas/TimeBasedLimiter.java | 27 ++++-
.../hbase/quotas/TestDefaultHandlerUsageQuota.java | 123 +++++++++++++++++++
.../hbase/quotas/TestDefaultOperationQuota.java | 24 ++--
.../apache/hadoop/hbase/quotas/TestQuotaState.java | 8 +-
.../hbase/quotas/TestThreadHandlerUsageQuota.java | 131 +++++++++++++++++++++
18 files changed, 459 insertions(+), 53 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
index 67f85da1d51..cea5fc7cb86 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
@@ -180,6 +180,10 @@ public class QuotaSettingsFactory {
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName,
namespace, regionServer,
ThrottleType.ATOMIC_REQUEST_NUMBER, throttle.getAtomicReqNum()));
}
+ if (throttle.hasReqHandlerUsageMs()) {
+ settings.add(ThrottleSettings.fromTimedQuota(userName, tableName,
namespace, regionServer,
+ ThrottleType.REQUEST_HANDLER_USAGE_MS,
throttle.getReqHandlerUsageMs()));
+ }
return settings;
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java
index dfa8eacb13b..d4ab38f5bf7 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java
@@ -44,13 +44,15 @@ public class RpcThrottlingException extends
HBaseIOException {
AtomicRequestNumberExceeded,
AtomicReadSizeExceeded,
AtomicWriteSizeExceeded,
+ RequestHandlerUsageTimeExceeded,
}
private static final String[] MSG_TYPE = new String[] { "number of requests
exceeded",
"request size limit exceeded", "number of read requests exceeded",
"number of write requests exceeded", "write size limit exceeded", "read
size limit exceeded",
"request capacity unit exceeded", "read capacity unit exceeded", "write
capacity unit exceeded",
- "atomic request number exceeded", "atomic read size exceeded", "atomic
write size exceeded" };
+ "atomic request number exceeded", "atomic read size exceeded", "atomic
write size exceeded",
+ "request handler usage time exceeded" };
private static final String MSG_WAIT = " - wait ";
@@ -145,6 +147,11 @@ public class RpcThrottlingException extends
HBaseIOException {
throwThrottlingException(Type.AtomicWriteSizeExceeded, waitInterval);
}
+ public static void throwRequestHandlerUsageTimeExceeded(final long
waitInterval)
+ throws RpcThrottlingException {
+ throwThrottlingException(Type.RequestHandlerUsageTimeExceeded,
waitInterval);
+ }
+
private static void throwThrottlingException(final Type type, final long
waitInterval)
throws RpcThrottlingException {
String msg = MSG_TYPE[type.ordinal()] + MSG_WAIT +
stringFromMillis(waitInterval);
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
index efde451c122..336848690d6 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
@@ -108,6 +108,9 @@ public class ThrottleSettings extends QuotaSettings {
case WRITE_CAPACITY_UNIT:
builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
break;
+ case REQUEST_HANDLER_USAGE_MS:
+ builder.append(String.format("%dms", timedQuota.getSoftLimit()));
+ break;
default:
}
} else if (timedQuota.hasShare()) {
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
index 2c5a25acc2c..277451a3e2e 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
@@ -59,4 +59,7 @@ public enum ThrottleType {
/** Throttling based on the size of atomic write requests */
ATOMIC_WRITE_SIZE,
+
+ /** Throttling based on the handler thread time in milliseconds used */
+ REQUEST_HANDLER_USAGE_MS,
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 616e0e37457..60175137ad2 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2496,6 +2496,8 @@ public final class ProtobufUtil {
return ThrottleType.ATOMIC_REQUEST_NUMBER;
case ATOMIC_WRITE_SIZE:
return ThrottleType.ATOMIC_WRITE_SIZE;
+ case REQUEST_HANDLER_USAGE_MS:
+ return ThrottleType.REQUEST_HANDLER_USAGE_MS;
default:
throw new RuntimeException("Invalid ThrottleType " + proto);
}
@@ -2531,6 +2533,8 @@ public final class ProtobufUtil {
return QuotaProtos.ThrottleType.ATOMIC_REQUEST_NUMBER;
case ATOMIC_WRITE_SIZE:
return QuotaProtos.ThrottleType.ATOMIC_WRITE_SIZE;
+ case REQUEST_HANDLER_USAGE_MS:
+ return QuotaProtos.ThrottleType.REQUEST_HANDLER_USAGE_MS;
default:
throw new RuntimeException("Invalid ThrottleType " + type);
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/Quota.proto
b/hbase-protocol-shaded/src/main/protobuf/server/Quota.proto
index e524e015b62..f28cb701646 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/Quota.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/Quota.proto
@@ -52,6 +52,7 @@ enum ThrottleType {
ATOMIC_READ_SIZE = 10;
ATOMIC_REQUEST_NUMBER = 11;
ATOMIC_WRITE_SIZE = 12;
+ REQUEST_HANDLER_USAGE_MS = 13;
}
message Throttle {
@@ -71,6 +72,8 @@ message Throttle {
optional TimedQuota atomic_read_size = 10;
optional TimedQuota atomic_req_num = 11;
optional TimedQuota atomic_write_size = 12;
+
+ optional TimedQuota req_handler_usage_ms = 13;
}
message ThrottleRequest {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index f153eca2e5a..16082bff98f 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -21,10 +21,12 @@ import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -45,11 +47,18 @@ public class DefaultOperationQuota implements
OperationQuota {
// the available read/write quota size in bytes
protected long readAvailable = 0;
+
+ // The estimated handler usage time in ms for a request based on
+ // the number of requests per second and the number of handler threads
+ private final long estimatedHandlerUsagePerReq;
+
// estimated quota
protected long writeConsumed = 0;
protected long readConsumed = 0;
protected long writeCapacityUnitConsumed = 0;
protected long readCapacityUnitConsumed = 0;
+ protected long handlerUsageTimeConsumed = 0;
+
// real consumed quota
private final long[] operationSize;
// difference between estimated quota and real consumed quota used in close
method
@@ -59,14 +68,15 @@ public class DefaultOperationQuota implements
OperationQuota {
protected long readDiff = 0;
protected long writeCapacityUnitDiff = 0;
protected long readCapacityUnitDiff = 0;
+ protected long handlerUsageTimeDiff = 0;
private boolean useResultSizeBytes;
private long blockSizeBytes;
private long maxScanEstimate;
private boolean isAtomic = false;
public DefaultOperationQuota(final Configuration conf, final int
blockSizeBytes,
- final QuotaLimiter... limiters) {
- this(conf, Arrays.asList(limiters));
+ final double requestsPerSecond, final QuotaLimiter... limiters) {
+ this(conf, requestsPerSecond, Arrays.asList(limiters));
this.useResultSizeBytes =
conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES,
USE_RESULT_SIZE_BYTES_DEFAULT);
this.blockSizeBytes = blockSizeBytes;
@@ -78,15 +88,20 @@ public class DefaultOperationQuota implements
OperationQuota {
/**
* NOTE: The order matters. It should be something like [user, table,
namespace, global]
*/
- public DefaultOperationQuota(final Configuration conf, final
List<QuotaLimiter> limiters) {
+ public DefaultOperationQuota(final Configuration conf, final double
requestsPerSecond,
+ final List<QuotaLimiter> limiters) {
this.writeCapacityUnit =
conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY,
QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT);
this.readCapacityUnit =
conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY,
QuotaUtil.DEFAULT_READ_CAPACITY_UNIT);
this.limiters = limiters;
+ int numHandlerThreads = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+ this.estimatedHandlerUsagePerReq =
+ calculateHandlerUsageTimeEstimate(requestsPerSecond, numHandlerThreads);
+
int size = OperationType.values().length;
operationSize = new long[size];
-
for (int i = 0; i < size; ++i) {
operationSize[i] = 0;
}
@@ -128,13 +143,13 @@ public class DefaultOperationQuota implements
OperationQuota {
limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites),
Math.min(maxWriteSizeToEstimate, writeConsumed),
Math.min(maxReadsToEstimate, numReads),
Math.min(maxReadSizeToEstimate, readConsumed),
writeCapacityUnitConsumed,
- readCapacityUnitConsumed, isAtomic);
+ readCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed);
readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
}
for (final QuotaLimiter limiter : limiters) {
limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed,
writeCapacityUnitConsumed,
- readCapacityUnitConsumed, isAtomic);
+ readCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed);
}
}
@@ -152,12 +167,12 @@ public class DefaultOperationQuota implements
OperationQuota {
RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L);
readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed;
}
-
writeCapacityUnitDiff =
calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()],
writeConsumed);
readCapacityUnitDiff = calculateReadCapacityUnitDiff(
operationSize[OperationType.GET.ordinal()] +
operationSize[OperationType.SCAN.ordinal()],
readConsumed);
+ handlerUsageTimeDiff = calculateHandlerUsageMsDiff();
for (final QuotaLimiter limiter : limiters) {
if (writeDiff != 0) {
@@ -166,6 +181,9 @@ public class DefaultOperationQuota implements
OperationQuota {
if (readDiff != 0) {
limiter.consumeRead(readDiff, readCapacityUnitDiff, isAtomic);
}
+ if (handlerUsageTimeDiff != 0) {
+ limiter.consumeTime(handlerUsageTimeDiff);
+ }
}
}
@@ -216,6 +234,8 @@ public class DefaultOperationQuota implements
OperationQuota {
writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
+
+ handlerUsageTimeConsumed = (numReads + numWrites) *
estimatedHandlerUsagePerReq;
}
/**
@@ -238,6 +258,7 @@ public class DefaultOperationQuota implements
OperationQuota {
}
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
+ handlerUsageTimeConsumed = estimatedHandlerUsagePerReq;
}
protected static long getScanReadConsumeEstimate(long blockSizeBytes, long
nextCallSeq,
@@ -288,4 +309,25 @@ public class DefaultOperationQuota implements
OperationQuota {
private long calculateReadCapacityUnitDiff(final long actualSize, final long
estimateSize) {
return calculateReadCapacityUnit(actualSize) -
calculateReadCapacityUnit(estimateSize);
}
+
+ private long calculateHandlerUsageTimeEstimate(final double
requestsPerSecond,
+ final int numHandlerThreads) {
+ if (requestsPerSecond <= numHandlerThreads) {
+ // If less than 1 request per second per handler thread, then we use the
number of handler
+ // threads as a baseline to avoid incorrect estimations when the number
of requests is very
+ // low.
+ return numHandlerThreads;
+ } else {
+ double requestsPerMillisecond = Math.ceil(requestsPerSecond / 1000);
+ // We don't ever want zero here
+ return Math.max((long) requestsPerMillisecond, 1L);
+ }
+ }
+
+ private long calculateHandlerUsageMsDiff() {
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ long startTime =
RpcServer.getCurrentCall().map(RpcCall::getStartTime).orElse(currentTime);
+ long timeElapsed = currentTime - startTime;
+ return handlerUsageTimeConsumed - timeElapsed;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
index 7dcfec6b062..eb8824685c6 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
@@ -43,8 +43,9 @@ public class ExceedOperationQuota extends
DefaultOperationQuota {
private QuotaLimiter regionServerLimiter;
public ExceedOperationQuota(final Configuration conf, int blockSizeBytes,
- QuotaLimiter regionServerLimiter, final QuotaLimiter... limiters) {
- super(conf, blockSizeBytes, limiters);
+ final double requestsPerSecond, QuotaLimiter regionServerLimiter,
+ final QuotaLimiter... limiters) {
+ super(conf, blockSizeBytes, requestsPerSecond, limiters);
this.regionServerLimiter = regionServerLimiter;
}
@@ -78,7 +79,7 @@ public class ExceedOperationQuota extends
DefaultOperationQuota {
estimateQuota.run();
// 2. Check if region server limiter is enough. If not, throw
RpcThrottlingException.
regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads +
numScans, readConsumed,
- writeCapacityUnitConsumed, readCapacityUnitConsumed, isAtomic);
+ writeCapacityUnitConsumed, readCapacityUnitConsumed, isAtomic,
handlerUsageTimeConsumed);
// 3. Check if other limiters are enough. If not, exceed other limiters
because region server
// limiter is enough.
boolean exceed = false;
@@ -94,13 +95,13 @@ public class ExceedOperationQuota extends
DefaultOperationQuota {
// 4. Region server limiter is enough and grab estimated consume quota.
readAvailable = Math.max(readAvailable,
regionServerLimiter.getReadAvailable());
regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads +
numScans, readConsumed,
- writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic);
+ writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic,
handlerUsageTimeConsumed);
if (exceed) {
// 5. Other quota limiter is exceeded and has not been grabbed
(because throw
// RpcThrottlingException in Step 3), so grab it.
for (final QuotaLimiter limiter : limiters) {
limiter.grabQuota(numWrites, writeConsumed, numReads + numScans,
readConsumed,
- writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic);
+ writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic,
0L);
}
}
}
@@ -115,6 +116,9 @@ public class ExceedOperationQuota extends
DefaultOperationQuota {
if (readDiff != 0) {
regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff, false);
}
+ if (handlerUsageTimeDiff != 0) {
+ regionServerLimiter.consumeTime(handlerUsageTimeDiff);
+ }
}
private interface CheckQuotaRunnable {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java
index 6afbebc6e86..2ce17452a32 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java
@@ -174,6 +174,11 @@ public class GlobalQuotaSettingsImpl extends
GlobalQuotaSettings {
hasThrottle = true;
}
break;
+ case REQUEST_HANDLER_USAGE_MS:
+ if (throttleBuilder.hasReqHandlerUsageMs()) {
+ hasThrottle = true;
+ }
+ break;
default:
}
return hasThrottle;
@@ -236,6 +241,9 @@ public class GlobalQuotaSettingsImpl extends
GlobalQuotaSettings {
case ATOMIC_WRITE_SIZE:
throttleBuilder.clearAtomicWriteSize();
break;
+ case REQUEST_HANDLER_USAGE_MS:
+ throttleBuilder.clearReqHandlerUsageMs();
+ break;
default:
}
boolean hasThrottle = false;
@@ -295,6 +303,8 @@ public class GlobalQuotaSettingsImpl extends
GlobalQuotaSettings {
case ATOMIC_WRITE_SIZE:
throttleBuilder.setAtomicWriteSize(otherProto.getTimedQuota());
break;
+ case REQUEST_HANDLER_USAGE_MS:
+ throttleBuilder.setReqHandlerUsageMs(otherProto.getTimedQuota());
default:
}
}
@@ -388,7 +398,12 @@ public class GlobalQuotaSettingsImpl extends
GlobalQuotaSettings {
case READ_CAPACITY_UNIT:
case WRITE_CAPACITY_UNIT:
builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
+ break;
+ case REQUEST_HANDLER_USAGE_MS:
+ builder.append(String.format("%dms", timedQuota.getSoftLimit()));
+ break;
default:
+ // no-op
}
} else if (timedQuota.hasShare()) {
builder.append(String.format("%.2f%%", timedQuota.getShare()));
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
index 7c02dbc1134..b75dfbbb7dd 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -35,13 +35,14 @@ class NoopQuotaLimiter implements QuotaLimiter {
@Override
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
long estimateReadSize, long estimateWriteCapacityUnit, long
estimateReadCapacityUnit,
- boolean isAtomic) throws RpcThrottlingException {
+ boolean isAtomic, long estimateHandlerThreadUsageMs) throws
RpcThrottlingException {
// no-op
}
@Override
public void grabQuota(long writeReqs, long writeSize, long readReqs, long
readSize,
- long writeCapacityUnit, long readCapacityUnit, boolean isAtomic) {
+ long writeCapacityUnit, long readCapacityUnit, boolean isAtomic,
+ long estimateHandlerThreadUsageMs) {
// no-op
}
@@ -55,6 +56,11 @@ class NoopQuotaLimiter implements QuotaLimiter {
// no-op
}
+ @Override
+ public void consumeTime(final long handlerMillisUsed) {
+ // no-op
+ }
+
@Override
public boolean isBypass() {
return true;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
index 1b5a1302a20..a98a1ca59b2 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -28,36 +28,48 @@ import org.apache.yetus.audience.InterfaceStability;
public interface QuotaLimiter {
/**
* Checks if it is possible to execute the specified operation.
- * @param writeReqs the write requests that will be checked
against the available
- * quota
- * @param estimateWriteSize the write size that will be checked
against the available
- * quota
- * @param readReqs the read requests that will be checked
against the available
- * quota
- * @param estimateReadSize the read size that will be checked
against the available quota
- * @param estimateWriteCapacityUnit the write capacity unit that will be
checked against the
- * available quota
- * @param estimateReadCapacityUnit the read capacity unit that will be
checked against the
- * available quota
+ * @param writeReqs the write requests that will be
checked against the
+ * available quota
+ * @param estimateWriteSize the write size that will be checked
against the available
+ * quota
+ * @param readReqs the read requests that will be
checked against the
+ * available quota
+ * @param estimateReadSize the read size that will be checked
against the available
+ * quota
+ * @param estimateWriteCapacityUnit the write capacity unit that will be
checked against the
+ * available quota
+ * @param estimateReadCapacityUnit the read capacity unit that will be
checked against the
+ * available quota
+ * @param isAtomic if the request performs an atomic
operation
+ * @param estimateHandlerThreadUsageMs the estimated handler usage time in
ms that will be checked
+ * against the available quota
* @throws RpcThrottlingException thrown if not enough available resources
to perform operation.
*/
void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long
estimateReadSize,
- long estimateWriteCapacityUnit, long estimateReadCapacityUnit, boolean
isAtomic)
- throws RpcThrottlingException;
+ long estimateWriteCapacityUnit, long estimateReadCapacityUnit, boolean
isAtomic,
+ long estimateHandlerThreadUsageMs) throws RpcThrottlingException;
/**
* Removes the specified write and read amount from the quota. At this point
the write and read
* amount will be an estimate, that will be later adjusted with a
consumeWrite()/consumeRead()
* call.
- * @param writeReqs the write requests that will be removed from the
current quota
- * @param writeSize the write size that will be removed from the
current quota
- * @param readReqs the read requests that will be removed from the
current quota
- * @param readSize the read size that will be removed from the
current quota
- * @param writeCapacityUnit the write capacity unit that will be removed
from the current quota
- * @param readCapacityUnit the read capacity unit num that will be removed
from the current quota
+ * @param writeReqs the write requests that will be
removed from the current
+ * quota
+ * @param writeSize the write size that will be removed
from the current quota
+ * @param readReqs the read requests that will be
removed from the current
+ * quota
+ * @param readSize the read size that will be removed
from the current quota
+ * @param writeCapacityUnit the write capacity unit that will be
removed from the
+ * current quota
+ * @param readCapacityUnit the read capacity unit num that will
be removed from the
+ * current quota
+ * @param isAtomic if the request performs an atomic
operation
+ * @param estimateHandlerThreadUsageMs the estimated handler usage time in
ms that will be removed
+ * from the available quota
*/
void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
- long writeCapacityUnit, long readCapacityUnit, boolean isAtomic);
+ long writeCapacityUnit, long readCapacityUnit, boolean isAtomic,
+ long estimateHandlerThreadUsageMs);
/**
* Removes or add back some write amount to the quota. (called at the end of
an operation in case
@@ -71,6 +83,13 @@ public interface QuotaLimiter {
*/
void consumeRead(long size, long capacityUnit, boolean isAtomic);
+ /**
+ * Removes or add back some handler thread usage milliseconds to the quota.
(called at the end of
+ * an operation in case the estimate quota was off)
+ * @param handlerMillisUsed the actual elapsed time used processing the
request
+ */
+ void consumeTime(long handlerMillisUsed);
+
/** Returns true if the limiter is a noop */
boolean isBypass();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
index 222645ca998..68752278383 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
@@ -102,6 +102,8 @@ public class QuotaUtil extends QuotaTableUtil {
"hbase.quota.default.user.machine.atomic.request.num";
public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE =
"hbase.quota.default.user.machine.atomic.write.size";
+ public static final String
QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS =
+ "hbase.quota.default.user.machine.request.handler.usage.ms";
/** Table descriptor for Quota internal table */
public static final TableDescriptor QUOTA_TABLE_DESC =
@@ -401,6 +403,8 @@ public class QuotaUtil extends QuotaTableUtil {
.ifPresent(throttleBuilder::setAtomicReqNum);
buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE)
.ifPresent(throttleBuilder::setAtomicWriteSize);
+ buildDefaultTimedQuota(conf,
QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS)
+ .ifPresent(throttleBuilder::setReqHandlerUsageMs);
UserQuotaState state = new UserQuotaState(nowTs);
QuotaProtos.Quotas defaultQuotas =
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
index d847a9eb3dc..7c23666490d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.quotas;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
@@ -33,6 +35,8 @@ import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
/**
@@ -52,11 +56,15 @@ public class RegionServerRpcQuotaManager implements
RpcQuotaManager {
private volatile boolean rpcThrottleEnabled;
// Storage for quota rpc throttle
private RpcThrottleStorage rpcThrottleStorage;
+ private final Supplier<Double> requestsPerSecondSupplier;
public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
this.rsServices = rsServices;
rpcThrottleStorage =
new RpcThrottleStorage(rsServices.getZooKeeper(),
rsServices.getConfiguration());
+ this.requestsPerSecondSupplier = Suppliers.memoizeWithExpiration(
+ () ->
rsServices.getMetrics().getRegionServerWrapper().getRequestsPerSecond(), 1,
+ TimeUnit.MINUTES);
}
public void start(final RpcScheduler rpcScheduler) throws IOException {
@@ -119,6 +127,7 @@ public class RegionServerRpcQuotaManager implements
RpcQuotaManager {
if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) {
UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
+
boolean useNoop = userLimiter.isBypass();
if (userQuotaState.hasBypassGlobals()) {
if (LOG.isTraceEnabled()) {
@@ -126,7 +135,7 @@ public class RegionServerRpcQuotaManager implements
RpcQuotaManager {
}
if (!useNoop) {
return new DefaultOperationQuota(this.rsServices.getConfiguration(),
blockSizeBytes,
- userLimiter);
+ requestsPerSecondSupplier.get(), userLimiter);
}
} else {
QuotaLimiter nsLimiter =
quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
@@ -143,10 +152,10 @@ public class RegionServerRpcQuotaManager implements
RpcQuotaManager {
if (!useNoop) {
if (exceedThrottleQuotaEnabled) {
return new
ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
- rsLimiter, userLimiter, tableLimiter, nsLimiter);
+ requestsPerSecondSupplier.get(), rsLimiter, userLimiter,
tableLimiter, nsLimiter);
} else {
return new
DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
- userLimiter, tableLimiter, nsLimiter, rsLimiter);
+ requestsPerSecondSupplier.get(), userLimiter, tableLimiter,
nsLimiter, rsLimiter);
}
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index e62d98242e4..232471092c2 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -45,6 +45,7 @@ public class TimeBasedLimiter implements QuotaLimiter {
private RateLimiter atomicReqLimiter = null;
private RateLimiter atomicReadSizeLimiter = null;
private RateLimiter atomicWriteSizeLimiter = null;
+ private RateLimiter reqHandlerUsageTimeLimiter = null;
private TimeBasedLimiter() {
if (
@@ -66,6 +67,7 @@ public class TimeBasedLimiter implements QuotaLimiter {
atomicReqLimiter = new FixedIntervalRateLimiter(refillInterval);
atomicReadSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
atomicWriteSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
+ reqHandlerUsageTimeLimiter = new
FixedIntervalRateLimiter(refillInterval);
} else {
reqsLimiter = new AverageIntervalRateLimiter();
reqSizeLimiter = new AverageIntervalRateLimiter();
@@ -79,6 +81,7 @@ public class TimeBasedLimiter implements QuotaLimiter {
atomicReqLimiter = new AverageIntervalRateLimiter();
atomicReadSizeLimiter = new AverageIntervalRateLimiter();
atomicWriteSizeLimiter = new AverageIntervalRateLimiter();
+ reqHandlerUsageTimeLimiter = new AverageIntervalRateLimiter();
}
}
@@ -145,6 +148,11 @@ public class TimeBasedLimiter implements QuotaLimiter {
isBypass = false;
}
+ if (throttle.hasReqHandlerUsageMs()) {
+ setFromTimedQuota(limiter.reqHandlerUsageTimeLimiter,
throttle.getReqHandlerUsageMs());
+ isBypass = false;
+ }
+
return isBypass ? NoopQuotaLimiter.get() : limiter;
}
@@ -161,6 +169,7 @@ public class TimeBasedLimiter implements QuotaLimiter {
atomicReqLimiter.update(other.atomicReqLimiter);
atomicReadSizeLimiter.update(other.atomicReadSizeLimiter);
atomicWriteSizeLimiter.update(other.atomicWriteSizeLimiter);
+ reqHandlerUsageTimeLimiter.update(other.reqHandlerUsageTimeLimiter);
}
private static void setFromTimedQuota(final RateLimiter limiter, final
TimedQuota timedQuota) {
@@ -170,7 +179,7 @@ public class TimeBasedLimiter implements QuotaLimiter {
@Override
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
long estimateReadSize, long estimateWriteCapacityUnit, long
estimateReadCapacityUnit,
- boolean isAtomic) throws RpcThrottlingException {
+ boolean isAtomic, long estimatedReqHandlerUsageTimeMs) throws
RpcThrottlingException {
long waitInterval = reqsLimiter.getWaitIntervalMs(writeReqs + readReqs);
if (waitInterval > 0) {
RpcThrottlingException.throwNumRequestsExceeded(waitInterval);
@@ -232,11 +241,16 @@ public class TimeBasedLimiter implements QuotaLimiter {
}
}
}
+ waitInterval =
reqHandlerUsageTimeLimiter.getWaitIntervalMs(estimatedReqHandlerUsageTimeMs);
+ if (waitInterval > 0) {
+
RpcThrottlingException.throwRequestHandlerUsageTimeExceeded(waitInterval);
+ }
}
@Override
public void grabQuota(long writeReqs, long writeSize, long readReqs, long
readSize,
- long writeCapacityUnit, long readCapacityUnit, boolean isAtomic) {
+ long writeCapacityUnit, long readCapacityUnit, boolean isAtomic,
+ long estimateHandlerThreadUsageMs) {
assert writeSize != 0 || readSize != 0;
reqsLimiter.consume(writeReqs + readReqs);
@@ -267,6 +281,7 @@ public class TimeBasedLimiter implements QuotaLimiter {
atomicWriteSizeLimiter.consume(writeSize);
}
}
+ reqHandlerUsageTimeLimiter.consume(estimateHandlerThreadUsageMs);
}
@Override
@@ -291,6 +306,11 @@ public class TimeBasedLimiter implements QuotaLimiter {
}
}
+ @Override
+ public void consumeTime(final long handlerMillisUsed) {
+ reqHandlerUsageTimeLimiter.consume(handlerMillisUsed);
+ }
+
@Override
public boolean isBypass() {
return false;
@@ -377,6 +397,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
if (!atomicWriteSizeLimiter.isBypass()) {
builder.append(" atomicWriteSizeLimiter=" + atomicWriteSizeLimiter);
}
+ if (!reqHandlerUsageTimeLimiter.isBypass()) {
+ builder.append(" reqHandlerUsageTimeLimiter=" +
reqHandlerUsageTimeLimiter);
+ }
builder.append(')');
return builder.toString();
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultHandlerUsageQuota.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultHandlerUsageQuota.java
new file mode 100644
index 00000000000..36fb9b35403
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultHandlerUsageQuota.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestDefaultHandlerUsageQuota {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestDefaultHandlerUsageQuota.class);
+ private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+ private static final TableName TABLE_NAME =
TableName.valueOf(UUID.randomUUID().toString());
+ private static final int REFRESH_TIME = 5;
+ private static final byte[] FAMILY = Bytes.toBytes("cf");
+ private static final byte[] QUALIFIER = Bytes.toBytes("q");
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
+ EnvironmentEdgeManager.reset();
+ TEST_UTIL.deleteTable(TABLE_NAME);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ // quotas enabled, using block bytes scanned
+ TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+ TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY,
REFRESH_TIME);
+ // Set default to very strict
+ TEST_UTIL.getConfiguration()
+ .setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS,
10);
+
+ // don't cache blocks to make IO predictable
+
TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,
0.0f);
+
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ QuotaCache.TEST_FORCE_REFRESH = true;
+ TEST_UTIL.flush(TABLE_NAME);
+ }
+
+ @Test
+ public void testDefaultHandlerUsageLimits() throws Exception {
+ // Should have a strict throttle by default
+ TEST_UTIL.waitFor(60_000, () -> runPutTest(100) < 100);
+
+ // Add big quota and should be effectively unlimited
+ configureLenientThrottle();
+ // Should run without error
+ TEST_UTIL.waitFor(60_000, () -> runPutTest(100) == 100);
+
+ // Remove all the limits, and should revert to strict default
+ unsetQuota();
+ TEST_UTIL.waitFor(60_000, () -> runPutTest(100) < 100);
+ }
+
+ private void configureLenientThrottle() throws IOException {
+ try (Admin admin = TEST_UTIL.getAdmin()) {
+ admin.setQuota(QuotaSettingsFactory.throttleUser(getUserName(),
+ ThrottleType.REQUEST_HANDLER_USAGE_MS, 100_000, TimeUnit.SECONDS));
+ }
+ }
+
+ private static String getUserName() throws IOException {
+ return User.getCurrent().getShortName();
+ }
+
+ private void unsetQuota() throws Exception {
+ try (Admin admin = TEST_UTIL.getAdmin()) {
+ admin.setQuota(QuotaSettingsFactory.unthrottleUser(getUserName()));
+ }
+ }
+
+ private long runPutTest(int attempts) throws Exception {
+ try (Table table = getTable()) {
+ return ThrottleQuotaTestUtil.doPuts(attempts, FAMILY, QUALIFIER, table);
+ }
+ }
+
+ private Table getTable() throws IOException {
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100);
+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME,
null).setOperationTimeout(250)
+ .build();
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
index beeab8aef5c..c22a03f8db0 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
@@ -41,6 +41,7 @@ public class TestDefaultOperationQuota {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDefaultOperationQuota.class);
+ private static final int DEFAULT_REQUESTS_PER_SECOND = 1000;
private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge();
static {
envEdge.setValue(EnvironmentEdgeManager.currentTime());
@@ -150,7 +151,8 @@ public class TestDefaultOperationQuota {
QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
- DefaultOperationQuota quota = new DefaultOperationQuota(new
Configuration(), 65536, limiter);
+ DefaultOperationQuota quota =
+ new DefaultOperationQuota(new Configuration(), 65536,
DEFAULT_REQUESTS_PER_SECOND, limiter);
// use the whole limit
quota.checkBatchQuota(0, limit, false);
@@ -171,7 +173,8 @@ public class TestDefaultOperationQuota {
QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
- DefaultOperationQuota quota = new DefaultOperationQuota(new
Configuration(), 65536, limiter);
+ DefaultOperationQuota quota =
+ new DefaultOperationQuota(new Configuration(), 65536,
DEFAULT_REQUESTS_PER_SECOND, limiter);
// use the whole limit
quota.checkBatchQuota(limit, 0, false);
@@ -192,7 +195,8 @@ public class TestDefaultOperationQuota {
QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
- DefaultOperationQuota quota = new DefaultOperationQuota(new
Configuration(), 65536, limiter);
+ DefaultOperationQuota quota =
+ new DefaultOperationQuota(new Configuration(), 65536,
DEFAULT_REQUESTS_PER_SECOND, limiter);
// use more than the limit, which should succeed rather than being
indefinitely blocked
quota.checkBatchQuota(0, 10 + limit, false);
@@ -213,7 +217,8 @@ public class TestDefaultOperationQuota {
QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
- DefaultOperationQuota quota = new DefaultOperationQuota(new
Configuration(), 65536, limiter);
+ DefaultOperationQuota quota =
+ new DefaultOperationQuota(new Configuration(), 65536,
DEFAULT_REQUESTS_PER_SECOND, limiter);
// use more than the limit, which should succeed rather than being
indefinitely blocked
quota.checkBatchQuota(10 + limit, 0, false);
@@ -234,7 +239,8 @@ public class TestDefaultOperationQuota {
QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
- DefaultOperationQuota quota = new DefaultOperationQuota(new
Configuration(), 65536, limiter);
+ DefaultOperationQuota quota =
+ new DefaultOperationQuota(new Configuration(), 65536,
DEFAULT_REQUESTS_PER_SECOND, limiter);
// writes are estimated a 100 bytes, so this will use 2x the limit but
should not be blocked
quota.checkBatchQuota(1, 0, false);
@@ -256,8 +262,8 @@ public class TestDefaultOperationQuota {
QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
- DefaultOperationQuota quota =
- new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter);
+ DefaultOperationQuota quota = new DefaultOperationQuota(new
Configuration(), (int) blockSize,
+ DEFAULT_REQUESTS_PER_SECOND, limiter);
// reads are estimated at 1 block each, so this will use ~2x the limit but
should not be blocked
quota.checkBatchQuota(0, 1, false);
@@ -279,8 +285,8 @@ public class TestDefaultOperationQuota {
QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
- DefaultOperationQuota quota =
- new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter);
+ DefaultOperationQuota quota = new DefaultOperationQuota(new
Configuration(), (int) blockSize,
+ DEFAULT_REQUESTS_PER_SECOND, limiter);
// reads are estimated at 1 block each, so this will use ~2x the limit but
should not be blocked
quota.checkBatchQuota(0, 1, false);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
index d64b1002b1e..59b26f3f0d9 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
@@ -224,7 +224,7 @@ public class TestQuotaState {
assertFalse(quotaInfo.isBypass());
QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A);
try {
- limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0,
1, 0, false);
+ limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0,
1, 0, false, 0L);
fail("Should have thrown RpcThrottlingException");
} catch (RpcThrottlingException e) {
// expected
@@ -241,7 +241,7 @@ public class TestQuotaState {
private void assertThrottleException(final QuotaLimiter limiter, final int
availReqs) {
assertNoThrottleException(limiter, availReqs);
try {
- limiter.checkQuota(1, 1, 0, 0, 1, 0, false);
+ limiter.checkQuota(1, 1, 0, 0, 1, 0, false, 0L);
fail("Should have thrown RpcThrottlingException");
} catch (RpcThrottlingException e) {
// expected
@@ -251,11 +251,11 @@ public class TestQuotaState {
private void assertNoThrottleException(final QuotaLimiter limiter, final int
availReqs) {
for (int i = 0; i < availReqs; ++i) {
try {
- limiter.checkQuota(1, 1, 0, 0, 1, 0, false);
+ limiter.checkQuota(1, 1, 0, 0, 1, 0, false, 0L);
} catch (RpcThrottlingException e) {
fail("Unexpected RpcThrottlingException after " + i + " requests.
limit=" + availReqs);
}
- limiter.grabQuota(1, 1, 0, 0, 1, 0, false);
+ limiter.grabQuota(1, 1, 0, 0, 1, 0, false, 0L);
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestThreadHandlerUsageQuota.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestThreadHandlerUsageQuota.java
new file mode 100644
index 00000000000..5c446b6d7c2
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestThreadHandlerUsageQuota.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestThreadHandlerUsageQuota {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestThreadHandlerUsageQuota.class);
+ private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+ private static final TableName TABLE_NAME =
TableName.valueOf(UUID.randomUUID().toString());
+ private static final int REFRESH_TIME = 5;
+ private static final byte[] FAMILY = Bytes.toBytes("cf");
+ private static final byte[] QUALIFIER = Bytes.toBytes("q");
+ private static final int MAX_OPS = 1000;
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
+ EnvironmentEdgeManager.reset();
+ TEST_UTIL.deleteTable(TABLE_NAME);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Enable quotas
+ TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+ TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY,
REFRESH_TIME);
+
+ // Don't cache blocks to make IO predictable
+
TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,
0.0f);
+
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
+
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ QuotaCache.TEST_FORCE_REFRESH = true;
+ TEST_UTIL.flush(TABLE_NAME);
+ }
+
+ @Test
+ public void testHandlerUsageThrottleForReads() throws Exception {
+ try (Table table = getTable()) {
+ unthrottleUser();
+ long unthrottledAttempts = ThrottleQuotaTestUtil.doGets(MAX_OPS, FAMILY,
QUALIFIER, table);
+
+ configureThrottle();
+ long throttledAttempts = ThrottleQuotaTestUtil.doGets(MAX_OPS, FAMILY,
QUALIFIER, table);
+ assertTrue("Throttled attempts should be less than unthrottled attempts",
+ throttledAttempts < unthrottledAttempts);
+ }
+ }
+
+ @Test
+ public void testHandlerUsageThrottleForWrites() throws Exception {
+ try (Table table = getTable()) {
+ unthrottleUser();
+ long unthrottledAttempts = ThrottleQuotaTestUtil.doPuts(MAX_OPS, FAMILY,
QUALIFIER, table);
+
+ configureThrottle();
+ long throttledAttempts = ThrottleQuotaTestUtil.doPuts(MAX_OPS, FAMILY,
QUALIFIER, table);
+ assertTrue("Throttled attempts should be less than unthrottled attempts",
+ throttledAttempts < unthrottledAttempts);
+ }
+ }
+
+ private void configureThrottle() throws IOException {
+ try (Admin admin = TEST_UTIL.getAdmin()) {
+ admin.setQuota(QuotaSettingsFactory.throttleUser(getUserName(),
+ ThrottleType.REQUEST_HANDLER_USAGE_MS, 10000, TimeUnit.SECONDS));
+ }
+ }
+
+ private void unthrottleUser() throws Exception {
+ try (Admin admin = TEST_UTIL.getAdmin()) {
+
admin.setQuota(QuotaSettingsFactory.unthrottleUserByThrottleType(getUserName(),
+ ThrottleType.REQUEST_HANDLER_USAGE_MS));
+ }
+ }
+
+ private static String getUserName() throws IOException {
+ return User.getCurrent().getShortName();
+ }
+
+ private Table getTable() throws IOException {
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100);
+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME,
null).setOperationTimeout(250)
+ .build();
+ }
+}