This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 68cce0cad36 HBASE-28349 Count atomic operations against read quotas
(#5670)
68cce0cad36 is described below
commit 68cce0cad36b42383730fc4e8bfeab130ec352a1
Author: Ray Mattingly <[email protected]>
AuthorDate: Fri Feb 9 08:31:29 2024 -0500
HBASE-28349 Count atomic operations against read quotas (#5670)
Signed-off-by: Bryan Beaudreault <[email protected]>
---
.../apache/hadoop/hbase/quotas/OperationQuota.java | 3 +-
.../org/apache/hadoop/hbase/quotas/QuotaUtil.java | 26 +++
.../hbase/quotas/RegionServerRpcQuotaManager.java | 16 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 9 +-
.../hadoop/hbase/quotas/TestAtomicReadQuota.java | 245 +++++++++++++++++++++
5 files changed, 291 insertions(+), 8 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index e18d3eb3495..ffc3cd50825 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -32,7 +32,8 @@ public interface OperationQuota {
public enum OperationType {
MUTATE,
GET,
- SCAN
+ SCAN,
+ CHECK_AND_MUTATE
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
index 8e7bce8b1e7..831c0297785 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
@@ -176,6 +177,31 @@ public class QuotaUtil extends QuotaTableUtil {
deleteQuotas(connection, getRegionServerRowKey(regionServer));
}
+ public static OperationQuota.OperationType
getQuotaOperationType(ClientProtos.Action action,
+ boolean hasCondition) {
+ if (action.hasMutation()) {
+ return getQuotaOperationType(action.getMutation(), hasCondition);
+ }
+ return OperationQuota.OperationType.GET;
+ }
+
+ public static OperationQuota.OperationType
+ getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) {
+ return getQuotaOperationType(mutateRequest.getMutation(),
mutateRequest.hasCondition());
+ }
+
+ private static OperationQuota.OperationType
+ getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean
hasCondition) {
+ ClientProtos.MutationProto.MutationType mutationType =
mutationProto.getMutateType();
+ if (
+ hasCondition || mutationType ==
ClientProtos.MutationProto.MutationType.APPEND
+ || mutationType == ClientProtos.MutationProto.MutationType.INCREMENT
+ ) {
+ return OperationQuota.OperationType.CHECK_AND_MUTATE;
+ }
+ return OperationQuota.OperationType.MUTATE;
+ }
+
protected static void switchExceedThrottleQuota(final Connection connection,
boolean exceedThrottleQuotaEnabled) throws IOException {
if (exceedThrottleQuotaEnabled) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
index de76303e27a..3c72c662887 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -171,6 +171,8 @@ public class RegionServerRpcQuotaManager {
return checkQuota(region, 0, 1, 0);
case MUTATE:
return checkQuota(region, 1, 0, 0);
+ case CHECK_AND_MUTATE:
+ return checkQuota(region, 1, 1, 0);
}
throw new RuntimeException("Invalid operation type: " + type);
}
@@ -178,18 +180,24 @@ public class RegionServerRpcQuotaManager {
/**
* Check the quota for the current (rpc-context) user. Returns the
OperationQuota used to get the
* available quota and to report the data/usage of the operation.
- * @param region the region where the operation will be performed
- * @param actions the "multi" actions to perform
+ * @param region the region where the operation will be performed
+ * @param actions the "multi" actions to perform
+ * @param hasCondition whether the RegionAction has a condition
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to
quota exceeded.
*/
- public OperationQuota checkQuota(final Region region, final
List<ClientProtos.Action> actions)
- throws IOException, RpcThrottlingException {
+ public OperationQuota checkQuota(final Region region, final
List<ClientProtos.Action> actions,
+ boolean hasCondition) throws IOException, RpcThrottlingException {
int numWrites = 0;
int numReads = 0;
for (final ClientProtos.Action action : actions) {
if (action.hasMutation()) {
numWrites++;
+ OperationQuota.OperationType operationType =
+ QuotaUtil.getQuotaOperationType(action, hasCondition);
+ if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) {
+ numReads++;
+ }
} else if (action.hasGet()) {
numReads++;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index e107ab80e13..0330e523438 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2751,7 +2751,8 @@ public class RSRpcServices implements
HBaseRPCErrorHandler, AdminService.Blockin
try {
region = getRegion(regionSpecifier);
- quota = getRpcQuotaManager().checkQuota(region,
regionAction.getActionList());
+ quota = getRpcQuotaManager().checkQuota(region,
regionAction.getActionList(),
+ regionAction.hasCondition());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder,
regionAction, cellScanner, e);
return responseBuilder.build();
@@ -2802,7 +2803,8 @@ public class RSRpcServices implements
HBaseRPCErrorHandler, AdminService.Blockin
try {
region = getRegion(regionSpecifier);
- quota = getRpcQuotaManager().checkQuota(region,
regionAction.getActionList());
+ quota = getRpcQuotaManager().checkQuota(region,
regionAction.getActionList(),
+ regionAction.hasCondition());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder,
regionAction, cellScanner, e);
continue; // For this region it's a failure.
@@ -2957,7 +2959,8 @@ public class RSRpcServices implements
HBaseRPCErrorHandler, AdminService.Blockin
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
}
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() :
HConstants.NO_NONCE;
- quota = getRpcQuotaManager().checkQuota(region,
OperationQuota.OperationType.MUTATE);
+ OperationQuota.OperationType operationType =
QuotaUtil.getQuotaOperationType(request);
+ quota = getRpcQuotaManager().checkQuota(region, operationType);
ActivePolicyEnforcement spaceQuotaEnforcement =
getSpaceQuotaManager().getActiveEnforcements();
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
new file mode 100644
index 00000000000..9217ff14b97
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestAtomicReadQuota {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAtomicReadQuota.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(TestAtomicReadQuota.class);
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+ private static final TableName TABLE_NAME =
TableName.valueOf(UUID.randomUUID().toString());
+ private static final byte[] FAMILY = Bytes.toBytes("cf");
+ private static final byte[] QUALIFIER = Bytes.toBytes("q");
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
+ EnvironmentEdgeManager.reset();
+ TEST_UTIL.deleteTable(TABLE_NAME);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 1);
+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
+ TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+ TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000);
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ QuotaCache.TEST_FORCE_REFRESH = true;
+ }
+
+ @Test
+ public void testIncrementCountedAgainstReadCapacity() throws Exception {
+ setupQuota();
+
+ Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
+ inc.addColumn(FAMILY, QUALIFIER, 1);
+ testThrottle(table -> table.increment(inc));
+ }
+
+ @Test
+ public void testConditionalRowMutationsCountedAgainstReadCapacity() throws
Exception {
+ setupQuota();
+
+ byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+ Increment inc = new Increment(row);
+ inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+
+ RowMutations rowMutations = new RowMutations(row);
+ rowMutations.add(inc);
+ rowMutations.add(put);
+ testThrottle(table -> table.mutateRow(rowMutations));
+ }
+
+ @Test
+ public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws
Exception {
+ setupQuota();
+
+ byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+
+ RowMutations rowMutations = new RowMutations(row);
+ rowMutations.add(put);
+ try (Table table = getTable()) {
+ for (int i = 0; i < 100; i++) {
+ table.mutateRow(rowMutations);
+ }
+ }
+ }
+
+ @Test
+ public void testNonAtomicPutOmittedFromReadCapacity() throws Exception {
+ setupQuota();
+
+ byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+ try (Table table = getTable()) {
+ for (int i = 0; i < 100; i++) {
+ table.put(put);
+ }
+ }
+ }
+
+ @Test
+ public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception {
+ setupQuota();
+
+ Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
+ put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+ Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
+ put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+
+ Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
+ inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
+
+ List<Put> puts = new ArrayList<>(2);
+ puts.add(put1);
+ puts.add(put2);
+
+ try (Table table = getTable()) {
+ for (int i = 0; i < 100; i++) {
+ table.put(puts);
+ }
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception {
+ setupQuota();
+
+ byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+ byte[] value = Bytes.toBytes("v");
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("doot"), value);
+ CheckAndMutate checkAndMutate =
+ CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER,
value).build(put);
+
+ testThrottle(table -> table.checkAndMutate(checkAndMutate));
+ }
+
+ @Test
+ public void testAtomicBatchCountedAgainstReadCapacity() throws Exception {
+ setupQuota();
+
+ byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+ Increment inc = new Increment(row);
+ inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
+
+ List<Increment> incs = new ArrayList<>(2);
+ incs.add(inc);
+ incs.add(inc);
+
+ testThrottle(table -> {
+ Object[] results = new Object[incs.size()];
+ table.batch(incs, results);
+ return results;
+ });
+ }
+
+ private void setupQuota() throws Exception {
+ try (Admin admin = TEST_UTIL.getAdmin()) {
+
admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
+ ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES));
+ }
+ ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false,
TABLE_NAME);
+ }
+
+ private void cleanupQuota() throws Exception {
+ try (Admin admin = TEST_UTIL.getAdmin()) {
+
admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName()));
+ }
+ ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+ }
+
+ private void testThrottle(ThrowingFunction<Table, ?> request) throws
Exception {
+ try (Table table = getTable()) {
+ // we have a read quota configured, so this should fail
+ TEST_UTIL.waitFor(60_000, () -> {
+ boolean success;
+ Exception ex;
+ try {
+ request.run(table);
+ return false;
+ } catch (RetriesExhaustedWithDetailsException e) {
+ success = e.getCauses().stream().allMatch(t -> t instanceof
RpcThrottlingException);
+ ex = e;
+ } catch (Exception e) {
+ success = e.getCause() instanceof RpcThrottlingException;
+ ex = e;
+ }
+ if (!success) {
+ LOG.error("Unexpected exception", ex);
+ }
+ return success;
+ });
+ } finally {
+ cleanupQuota();
+ }
+ }
+
+ private Table getTable() throws IOException {
+ return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME,
null).setOperationTimeout(250)
+ .build();
+ }
+
+ @FunctionalInterface
+ private interface ThrowingFunction<I, O> {
+ O run(I input) throws Exception;
+ }
+
+}