HBASE-19924 hbase rpc throttling does not work for multi() with request count rater.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/62a3434b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/62a3434b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/62a3434b Branch: refs/heads/branch-1.3 Commit: 62a3434b9bb849395b45e90cf17f56a74eb96c00 Parents: dead8de Author: Huaxiang Sun <h...@cloudera.com> Authored: Thu Apr 26 10:43:59 2018 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Thu Apr 26 13:27:58 2018 -0700 ---------------------------------------------------------------------- .../hbase/quotas/DefaultOperationQuota.java | 4 +-- .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 5 +-- .../hadoop/hbase/quotas/QuotaLimiter.java | 8 +++-- .../hadoop/hbase/quotas/TimeBasedLimiter.java | 37 +++++++++++--------- .../hadoop/hbase/quotas/TestQuotaState.java | 36 ++++++++++++++++--- 5 files changed, 63 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/62a3434b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 6caac74..45a72d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -61,13 +61,13 @@ public class DefaultOperationQuota implements OperationQuota { for (final QuotaLimiter limiter : limiters) { if (limiter.isBypass()) continue; - limiter.checkQuota(writeConsumed, readConsumed); + limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable()); } for (final QuotaLimiter limiter : limiters) { - limiter.grabQuota(writeConsumed, readConsumed); + limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/62a3434b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index e72f3d2..c413471 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -28,12 +28,13 @@ final class NoopQuotaLimiter implements QuotaLimiter { } @Override - public void checkQuota(long estimateWriteSize, long estimateReadSize) throws ThrottlingException { + public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, + long estimateReadSize) throws ThrottlingException { // no-op } @Override - public void grabQuota(long writeSize, long readSize) { + public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) { // no-op } http://git-wip-us.apache.org/repos/asf/hbase/blob/62a3434b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index a27211c..3b09cee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -31,11 +31,13 @@ public interface QuotaLimiter { /** * Checks if it is possible to execute the specified operation. * + * @param writeReqs the write requests that will be checked against the available quota * @param estimateWriteSize the write size that will be checked against the available quota + * @param readReqs the read requests that will be checked against the available quota * @param estimateReadSize the read size that will be checked against the available quota * @throws ThrottlingException thrown if not enough avialable resources to perform operation. */ - void checkQuota(long estimateWriteSize, long estimateReadSize) + void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize) throws ThrottlingException; /** @@ -43,10 +45,12 @@ public interface QuotaLimiter { * At this point the write and read amount will be an estimate, * that will be later adjusted with a consumeWrite()/consumeRead() call. * + * @param writeReqs the write requests that will be removed from the current quota * @param writeSize the write size that will be removed from the current quota + * @param readReqs the read requests that will be removed from the current quota * @param readSize the read size that will be removed from the current quota */ - void grabQuota(long writeSize, long readSize); + void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize); /** * Removes or add back some write amount to the quota. http://git-wip-us.apache.org/repos/asf/hbase/blob/62a3434b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index 1563878..7bbe4e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -110,47 +110,50 @@ public class TimeBasedLimiter implements QuotaLimiter { } @Override - public void checkQuota(long writeSize, long readSize) throws ThrottlingException { - if (!reqsLimiter.canExecute()) { + public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, + long estimateReadSize) throws ThrottlingException { + if (!reqsLimiter.canExecute(writeReqs + readReqs)) { ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); } - if (!reqSizeLimiter.canExecute(writeSize + readSize)) { - ThrottlingException.throwRequestSizeExceeded(reqSizeLimiter - .waitInterval(writeSize + readSize)); + if (!reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize)) { + ThrottlingException.throwRequestSizeExceeded( + reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize)); } - if (writeSize > 0) { - if (!writeReqsLimiter.canExecute()) { + if (estimateWriteSize > 0) { + if (!writeReqsLimiter.canExecute(writeReqs)) { ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval()); } - if (!writeSizeLimiter.canExecute(writeSize)) { - ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize)); + if (!writeSizeLimiter.canExecute(estimateWriteSize)) { + ThrottlingException.throwWriteSizeExceeded( + writeSizeLimiter.waitInterval(estimateWriteSize)); } } - if (readSize > 0) { - if (!readReqsLimiter.canExecute()) { + if (estimateReadSize > 0) { + if (!readReqsLimiter.canExecute(readReqs)) { ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval()); } - if (!readSizeLimiter.canExecute(readSize)) { - ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize)); + if (!readSizeLimiter.canExecute(estimateReadSize)) { + ThrottlingException.throwReadSizeExceeded( + readSizeLimiter.waitInterval(estimateReadSize)); } } } @Override - public void grabQuota(long writeSize, long readSize) { + public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) { assert writeSize != 0 || readSize != 0; - reqsLimiter.consume(1); + reqsLimiter.consume(writeReqs + readReqs); reqSizeLimiter.consume(writeSize + readSize); if (writeSize > 0) { - writeReqsLimiter.consume(1); + writeReqsLimiter.consume(writeReqs); writeSizeLimiter.consume(writeSize); } if (readSize > 0) { - readReqsLimiter.consume(1); + readReqsLimiter.consume(readReqs); readSizeLimiter.consume(readSize); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/62a3434b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java index 8d33cd2..3a9ddeb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java @@ -184,6 +184,34 @@ public class TestQuotaState { assertNoopLimiter(quotaInfo.getTableLimiter(UNKNOWN_TABLE_NAME)); } + @Test(timeout = 60000) + public void testTableThrottleWithBatch() { + final TableName TABLE_A = TableName.valueOf("TableA"); + final int TABLE_A_THROTTLE_1 = 3; + final long LAST_UPDATE_1 = 10; + + UserQuotaState quotaInfo = new UserQuotaState(); + assertEquals(0, quotaInfo.getLastUpdate()); + assertTrue(quotaInfo.isBypass()); + + // Add A table limiters + UserQuotaState otherQuotaState = new UserQuotaState(LAST_UPDATE_1); + otherQuotaState.setQuotas(TABLE_A, buildReqNumThrottle(TABLE_A_THROTTLE_1)); + assertEquals(LAST_UPDATE_1, otherQuotaState.getLastUpdate()); + assertFalse(otherQuotaState.isBypass()); + + quotaInfo.update(otherQuotaState); + assertEquals(LAST_UPDATE_1, quotaInfo.getLastUpdate()); + assertFalse(quotaInfo.isBypass()); + QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A); + try { + limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0); + fail("Should have thrown ThrottlingException"); + } catch (ThrottlingException e) { + // expected + } + } + private Quotas buildReqNumThrottle(final long limit) { return Quotas .newBuilder() @@ -196,7 +224,7 @@ public class TestQuotaState { private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) { assertNoThrottleException(limiter, availReqs); try { - limiter.checkQuota(1, 1); + limiter.checkQuota(1, 1, 0, 0); fail("Should have thrown ThrottlingException"); } catch (ThrottlingException e) { // expected @@ -206,11 +234,11 @@ public class TestQuotaState { private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) { for (int i = 0; i < availReqs; ++i) { try { - limiter.checkQuota(1, 1); + limiter.checkQuota(1, 1, 0, 0); } catch (ThrottlingException e) { fail("Unexpected ThrottlingException after " + i + " requests. limit=" + availReqs); } - limiter.grabQuota(1, 1); + limiter.grabQuota(1, 1, 0, 0); } } @@ -218,4 +246,4 @@ public class TestQuotaState { assertTrue(limiter == NoopQuotaLimiter.get()); assertNoThrottleException(limiter, 100); } -} \ No newline at end of file +}