This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new f0f6131b44a HBASE-28346: Expose checkQuota to Coprocessor Endpoints
(#6066)
f0f6131b44a is described below
commit f0f6131b44a06d5b210259b3fcdd24b7ebf1552e
Author: Nick Dimiduk <[email protected]>
AuthorDate: Wed Aug 7 10:06:49 2024 +0200
HBASE-28346: Expose checkQuota to Coprocessor Endpoints (#6066)
Signed-off-by: Nick Dimiduk <[email protected]>
Co-authored-by: Charles Connell <[email protected]>
---
.../coprocessor/RegionCoprocessorEnvironment.java | 52 +++++++++
.../hadoop/hbase/quotas/DefaultOperationQuota.java | 6 +
.../hadoop/hbase/quotas/NoopOperationQuota.java | 6 +
.../apache/hadoop/hbase/quotas/OperationQuota.java | 7 ++
.../org/apache/hadoop/hbase/quotas/QuotaUtil.java | 8 ++
.../hbase/quotas/RegionServerRpcQuotaManager.java | 44 +-------
.../hadoop/hbase/quotas/RpcQuotaManager.java | 92 ++++++++++++++++
.../hbase/regionserver/RegionCoprocessorHost.java | 43 ++++++++
.../TestRegionCoprocessorQuotaUsage.java | 122 +++++++++++++++++++++
9 files changed, 342 insertions(+), 38 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
index 8967f596379..923a719264b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
@@ -26,7 +26,11 @@ import org.apache.hadoop.hbase.RawCellBuilder;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
+import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.RpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.regionserver.OnlineRegions;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.security.User;
@@ -127,4 +131,52 @@ public interface RegionCoprocessorEnvironment extends
CoprocessorEnvironment<Reg
* @return the RawCellBuilder
*/
RawCellBuilder getCellBuilder();
+
+ /**
+ * Returns an RpcQuotaManager that can be used to apply quota checks against
the workloads
+ * generated by the coprocessor.
+ * @return the RpcQuotaManager
+ */
+ RpcQuotaManager getRpcQuotaManager();
+
+ /**
+ * Check the quota for the current (rpc-context) user. Returns the
OperationQuota used to get the
+ * available quota and to report the data/usage of the operation.
+ * @param scan the scan to be estimated against
the quota
+ * @param maxBlockBytesScanned the maximum bytes scanned in a
single RPC call by the
+ * scanner
+ * @param prevBlockBytesScannedDifference the difference between BBS of the
previous two next
+ * calls
+ * @return the OperationQuota
+ * @throws RpcThrottlingException if the operation cannot be executed due to
quota exceeded.
+ */
+ OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned,
+ long prevBlockBytesScannedDifference) throws IOException,
RpcThrottlingException;
+
+ /**
+ * Check the quota for the current (rpc-context) user. Returns the
OperationQuota used to get the
+ * available quota and to report the data/usage of the operation. This
method does not support
+ * scans because estimating a scan's workload is more complicated than
estimating the workload of
+ * a get/put.
+ * @param region the region where the operation will be performed
+ * @param type the operation type
+ * @return the OperationQuota
+ * @throws RpcThrottlingException if the operation cannot be executed due to
quota exceeded.
+ */
+ OperationQuota checkBatchQuota(final Region region, final
OperationQuota.OperationType type)
+ throws IOException, RpcThrottlingException;
+
+ /**
+ * Check the quota for the current (rpc-context) user. Returns the
OperationQuota used to get the
+ * available quota and to report the data/usage of the operation. This
method does not support
+ * scans because estimating a scan's workload is more complicated than
estimating the workload of
+ * a get/put.
+ * @param region the region where the operation will be performed
+ * @param numWrites number of writes to count against quota
+ * @param numReads number of reads to count against quota
+ * @return the OperationQuota
+ * @throws RpcThrottlingException if the operation cannot be executed due to
quota exceeded.
+ */
+ OperationQuota checkBatchQuota(final Region region, int numWrites, int
numReads)
+ throws IOException, RpcThrottlingException;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index a387c04e4e5..29c3667fb35 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.quotas;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ipc.RpcCall;
@@ -181,6 +182,11 @@ public class DefaultOperationQuota implements
OperationQuota {
operationSize[OperationType.SCAN.ordinal()] +=
QuotaUtil.calculateResultSize(results);
}
+ @Override
+ public void addScanResultCells(final List<Cell> cells) {
+ operationSize[OperationType.SCAN.ordinal()] +=
QuotaUtil.calculateCellsSize(cells);
+ }
+
@Override
public void addMutation(final Mutation mutation) {
operationSize[OperationType.MUTATE.ordinal()] +=
QuotaUtil.calculateMutationSize(mutation);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
index 736560e6fd1..63cf97188d8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.quotas;
import java.util.List;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.yetus.audience.InterfaceAudience;
@@ -81,4 +82,9 @@ class NoopOperationQuota implements OperationQuota {
public long getReadConsumed() {
return 0L;
}
+
+ @Override
+ public void addScanResultCells(List<Cell> cells) {
+ // no-op
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index ef0a35fa589..0d9b48b6074 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.quotas;
import java.util.List;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.yetus.audience.InterfaceAudience;
@@ -88,6 +89,12 @@ public interface OperationQuota {
*/
void addScanResult(List<Result> results);
+ /**
+ * Add a scan result in the form of cells. This will be used to calculate
the exact quota and have
+ * a better long-read average size for the next time.
+ */
+ void addScanResultCells(List<Cell> cells);
+
/**
* Add a mutation result. This will be used to calculate the exact quota and
have a better
* mutation average size for the next time.
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
index 8ced76e3963..b4887392196 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
@@ -590,6 +590,14 @@ public class QuotaUtil extends QuotaTableUtil {
return size;
}
+ public static long calculateCellsSize(final List<Cell> cells) {
+ long size = 0;
+ for (Cell cell : cells) {
+ size += cell.getSerializedSize();
+ }
+ return size;
+ }
+
/**
* Method to enable a table, if not already enabled. This method suppresses
* {@link TableNotDisabledException} and {@link TableNotFoundException}, if
thrown while enabling
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
index 92a0cfd5c13..f9a7ccba401 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -43,7 +43,7 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class RegionServerRpcQuotaManager {
+public class RegionServerRpcQuotaManager implements RpcQuotaManager {
private static final Logger LOG =
LoggerFactory.getLogger(RegionServerRpcQuotaManager.class);
private final RegionServerServices rsServices;
@@ -154,21 +154,7 @@ public class RegionServerRpcQuotaManager {
return NoopOperationQuota.get();
}
- /**
- * Check the quota for the current (rpc-context) user. Returns the
OperationQuota used to get the
- * available quota and to report the data/usage of the operation. This
method is specific to scans
- * because estimating a scan's workload is more complicated than estimating
the workload of a
- * get/put.
- * @param region the region where the operation
will be performed
- * @param scanRequest the scan to be estimated against
the quota
- * @param maxScannerResultSize the maximum bytes to be returned
by the scanner
- * @param maxBlockBytesScanned the maximum bytes scanned in a
single RPC call by the
- * scanner
- * @param prevBlockBytesScannedDifference the difference between BBS of the
previous two next
- * calls
- * @return the OperationQuota
- * @throws RpcThrottlingException if the operation cannot be executed due to
quota exceeded.
- */
+ @Override
public OperationQuota checkScanQuota(final Region region,
final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
@@ -195,16 +181,7 @@ public class RegionServerRpcQuotaManager {
return quota;
}
- /**
- * Check the quota for the current (rpc-context) user. Returns the
OperationQuota used to get the
- * available quota and to report the data/usage of the operation. This
method does not support
- * scans because estimating a scan's workload is more complicated than
estimating the workload of
- * a get/put.
- * @param region the region where the operation will be performed
- * @param type the operation type
- * @return the OperationQuota
- * @throws RpcThrottlingException if the operation cannot be executed due to
quota exceeded.
- */
+ @Override
public OperationQuota checkBatchQuota(final Region region,
final OperationQuota.OperationType type) throws IOException,
RpcThrottlingException {
switch (type) {
@@ -218,17 +195,7 @@ public class RegionServerRpcQuotaManager {
throw new RuntimeException("Invalid operation type: " + type);
}
- /**
- * Check the quota for the current (rpc-context) user. Returns the
OperationQuota used to get the
- * available quota and to report the data/usage of the operation. This
method does not support
- * scans because estimating a scan's workload is more complicated than
estimating the workload of
- * a get/put.
- * @param region the region where the operation will be performed
- * @param actions the "multi" actions to perform
- * @param hasCondition whether the RegionAction has a condition
- * @return the OperationQuota
- * @throws RpcThrottlingException if the operation cannot be executed due to
quota exceeded.
- */
+ @Override
public OperationQuota checkBatchQuota(final Region region,
final List<ClientProtos.Action> actions, boolean hasCondition)
throws IOException, RpcThrottlingException {
@@ -258,7 +225,8 @@ public class RegionServerRpcQuotaManager {
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to
quota exceeded.
*/
- private OperationQuota checkBatchQuota(final Region region, final int
numWrites,
+ @Override
+ public OperationQuota checkBatchQuota(final Region region, final int
numWrites,
final int numReads) throws IOException, RpcThrottlingException {
Optional<User> user = RpcServer.getRequestUser();
UserGroupInformation ugi;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java
new file mode 100644
index 00000000000..60392ca3b3f
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
[email protected](HBaseInterfaceAudience.COPROC)
[email protected]
+public interface RpcQuotaManager {
+
+ /**
+ * Check the quota for the current (rpc-context) user. Returns the
OperationQuota used to get the
+ * available quota and to report the data/usage of the operation. This
method is specific to scans
+ * because estimating a scan's workload is more complicated than estimating
the workload of a
+ * get/put.
+ * @param region the region where the operation
will be performed
+ * @param scanRequest the scan to be estimated against
the quota
+ * @param maxScannerResultSize the maximum bytes to be returned
by the scanner
+ * @param maxBlockBytesScanned the maximum bytes scanned in a
single RPC call by the
+ * scanner
+ * @param prevBlockBytesScannedDifference the difference between BBS of the
previous two next
+ * calls
+ * @return the OperationQuota
+ * @throws RpcThrottlingException if the operation cannot be executed due to
quota exceeded.
+ */
+ OperationQuota checkScanQuota(final Region region, final
ClientProtos.ScanRequest scanRequest,
+ long maxScannerResultSize, long maxBlockBytesScanned, long
prevBlockBytesScannedDifference)
+ throws IOException, RpcThrottlingException;
+
+ /**
+ * Check the quota for the current (rpc-context) user. Returns the
OperationQuota used to get the
+ * available quota and to report the data/usage of the operation. This
method does not support
+ * scans because estimating a scan's workload is more complicated than
estimating the workload of
+ * a get/put.
+ * @param region the region where the operation will be performed
+ * @param type the operation type
+ * @return the OperationQuota
+ * @throws RpcThrottlingException if the operation cannot be executed due to
quota exceeded.
+ */
+ OperationQuota checkBatchQuota(final Region region, final
OperationQuota.OperationType type)
+ throws IOException, RpcThrottlingException;
+
+ /**
+ * Check the quota for the current (rpc-context) user. Returns the
OperationQuota used to get the
+ * available quota and to report the data/usage of the operation. This
method does not support
+ * scans because estimating a scan's workload is more complicated than
estimating the workload of
+ * a get/put.
+ * @param region the region where the operation will be performed
+ * @param actions the "multi" actions to perform
+ * @param hasCondition whether the RegionAction has a condition
+ * @return the OperationQuota
+ * @throws RpcThrottlingException if the operation cannot be executed due to
quota exceeded.
+ */
+ OperationQuota checkBatchQuota(final Region region, final
List<ClientProtos.Action> actions,
+ boolean hasCondition) throws IOException, RpcThrottlingException;
+
+ /**
+ * Check the quota for the current (rpc-context) user. Returns the
OperationQuota used to get the
+ * available quota and to report the data/usage of the operation. This
method does not support
+ * scans because estimating a scan's workload is more complicated than
estimating the workload of
+ * a get/put.
+ * @param region the region where the operation will be performed
+ * @param numWrites number of writes to count against quota
+ * @param numReads number of reads to count against quota
+ * @return the OperationQuota
+ * @throws RpcThrottlingException if the operation cannot be executed due to
quota exceeded.
+ */
+ OperationQuota checkBatchQuota(final Region region, int numWrites, int
numReads)
+ throws IOException, RpcThrottlingException;
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 0e608fc8dcd..929b24e521a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -69,6 +69,9 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
+import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.RpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -85,6 +88,9 @@ import org.slf4j.LoggerFactory;
import
org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
import
org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
/**
* Implements the coprocessor environment and runtime support for coprocessors
loaded within a
* {@link Region}.
@@ -118,6 +124,7 @@ public class RegionCoprocessorHost
ConcurrentMap<String, Object> sharedData;
private final MetricRegistry metricRegistry;
private final RegionServerServices services;
+ private final RpcQuotaManager rpcQuotaManager;
/**
* Constructor
@@ -133,6 +140,13 @@ public class RegionCoprocessorHost
this.services = services;
this.metricRegistry =
MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
+ // Some unit tests reach this line with services == null, and are okay
with rpcQuotaManager
+ // being null. Let these unit tests succeed. This should not happen in
real usage.
+ if (services != null) {
+ this.rpcQuotaManager = services.getRegionServerRpcQuotaManager();
+ } else {
+ this.rpcQuotaManager = null;
+ }
}
/** Returns the region */
@@ -188,6 +202,35 @@ public class RegionCoprocessorHost
// We always do a DEEP_COPY only
return RawCellBuilderFactory.create();
}
+
+ @Override
+ public RpcQuotaManager getRpcQuotaManager() {
+ return rpcQuotaManager;
+ }
+
+ @Override
+ public OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned,
+ long prevBlockBytesScannedDifference) throws IOException,
RpcThrottlingException {
+ ClientProtos.ScanRequest scanRequest = RequestConverter
+ .buildScanRequest(region.getRegionInfo().getRegionName(), scan,
scan.getCaching(), false);
+ long maxScannerResultSize =
+
services.getConfiguration().getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
+ HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
+ return rpcQuotaManager.checkScanQuota(region, scanRequest,
maxScannerResultSize,
+ maxBlockBytesScanned, prevBlockBytesScannedDifference);
+ }
+
+ @Override
+ public OperationQuota checkBatchQuota(Region region,
OperationQuota.OperationType type)
+ throws IOException, RpcThrottlingException {
+ return rpcQuotaManager.checkBatchQuota(region, type);
+ }
+
+ @Override
+ public OperationQuota checkBatchQuota(final Region region, int numWrites,
int numReads)
+ throws IOException, RpcThrottlingException {
+ return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads);
+ }
}
/**
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java
new file mode 100644
index 00000000000..4a638d965b3
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, CoprocessorTests.class })
+public class TestRegionCoprocessorQuotaUsage {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionCoprocessorQuotaUsage.class);
+
+ private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static TableName TABLE_NAME =
TableName.valueOf("TestRegionCoprocessorQuotaUsage");
+ private static byte[] CF = Bytes.toBytes("CF");
+ private static byte[] CQ = Bytes.toBytes("CQ");
+ private static Connection CONN;
+ private static Table TABLE;
+ private static AtomicBoolean THROTTLING_OCCURRED = new AtomicBoolean(false);
+
+ public static class MyRegionObserver implements RegionObserver {
+ @Override
+ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get
get,
+ List<Cell> result) throws IOException {
+
+ // For the purposes of this test, we only need to catch a throttle
happening once, then
+ // let future requests pass through so we don't make this test take any
longer than necessary
+ if (!THROTTLING_OCCURRED.get()) {
+ try {
+ c.getEnvironment().checkBatchQuota(c.getEnvironment().getRegion(),
+ OperationQuota.OperationType.GET);
+ } catch (RpcThrottlingException e) {
+ THROTTLING_OCCURRED.set(true);
+ throw e;
+ }
+ }
+ }
+ }
+
+ public static class MyCoprocessor implements RegionCoprocessor {
+ RegionObserver observer = new MyRegionObserver();
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(observer);
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.setBoolean("hbase.quota.enabled", true);
+ conf.setInt("hbase.quota.default.user.machine.read.num", 2);
+ conf.set("hbase.quota.rate.limiter",
"org.apache.hadoop.hbase.quotas.FixedIntervalRateLimiter");
+ conf.set("hbase.quota.rate.limiter.refill.interval.ms", "300000");
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
MyCoprocessor.class.getName());
+ UTIL.startMiniCluster(3);
+ byte[][] splitKeys = new byte[8][];
+ for (int i = 111; i < 999; i += 111) {
+ splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+ }
+ UTIL.createTable(TABLE_NAME, CF, splitKeys);
+ CONN = UTIL.getConnection();
+ TABLE = CONN.getTable(TABLE_NAME);
+ TABLE.put(new Put(Bytes.toBytes(String.format("%d", 0))).addColumn(CF, CQ,
Bytes.toBytes(0L)));
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testGet() throws InterruptedException, ExecutionException,
IOException {
+ // Hit the table 5 times which ought to be enough to make a throttle happen
+ for (int i = 0; i < 5; i++) {
+ TABLE.get(new Get(Bytes.toBytes("000")));
+ }
+ assertTrue("Throttling did not happen as expected",
THROTTLING_OCCURRED.get());
+ }
+}