This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new e2a10f61800 HBASE-27784: support quota user overrides (#5424)
e2a10f61800 is described below

commit e2a10f61800fe130234f80869a7ab01967cde285
Author: Ray Mattingly <rmdmattin...@gmail.com>
AuthorDate: Wed Sep 27 13:58:13 2023 -0400

    HBASE-27784: support quota user overrides (#5424)
    
    Signed-off-by: Nick Dimiduk <ndimi...@apache.org>
    Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org>
---
 .../java/org/apache/hadoop/hbase/ipc/RpcCall.java  |  12 ++-
 .../org/apache/hadoop/hbase/ipc/ServerCall.java    |  13 +++
 .../org/apache/hadoop/hbase/quotas/QuotaCache.java |  36 ++++++-
 .../hbase/namequeues/TestNamedQueueRecorder.java   |   5 +
 .../store/region/TestRegionProcedureStore.java     |   5 +
 .../hadoop/hbase/quotas/TestQuotaUserOverride.java | 116 +++++++++++++++++++++
 6 files changed, 185 insertions(+), 2 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
index 0555202f88b..260d6e1a980 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
@@ -92,11 +92,21 @@ public interface RpcCall extends RpcCallContext {
   Map<String, byte[]> getConnectionAttributes();
 
   /**
-   * Returns the map of attributes specified when building the request.
+   * Returns the map of attributes specified when building the request. This 
map is lazily evaluated
+   * so if you only need a single attribute then it may be cheaper to use
+   * {@link #getRequestAttribute(String)}
    * @see 
org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[])
    */
   Map<String, byte[]> getRequestAttributes();
 
+  /**
+   * Returns a single request attribute value, or null if no value is present. 
If you need many
+   * request attributes then you should fetch the lazily evaluated map via
+   * {@link #getRequestAttributes()}
+   * @see 
org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[])
+   */
+  byte[] getRequestAttribute(String key);
+
   /** Returns Port of remote address in this call */
   int getRemotePort();
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index 66a2e44fac1..ed688977b96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -234,6 +234,19 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
     return this.requestAttributes;
   }
 
+  @Override
+  public byte[] getRequestAttribute(String key) {
+    if (this.requestAttributes == null) {
+      for (HBaseProtos.NameBytesPair nameBytesPair : 
header.getAttributeList()) {
+        if (nameBytesPair.getName().equals(key)) {
+          return nameBytesPair.getValue().toByteArray();
+        }
+      }
+      return null;
+    }
+    return this.requestAttributes.get(key);
+  }
+
   @Override
   public int getPriority() {
     return this.header.getPriority();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
index 0f7b5e42e68..0a57b9fd8f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -35,8 +36,11 @@ import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.RegionStatesCount;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -57,6 +61,11 @@ public class QuotaCache implements Stoppable {
   private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
 
   public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
+
+  // defines the request attribute key which, when provided, will override the 
request's username
+  // from the perspective of user quotas
+  public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY =
+    "hbase.quota.user.override.key";
   private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
   private static final int EVICT_PERIOD_FACTOR = 5; // N * 
REFRESH_DEFAULT_PERIOD
 
@@ -74,12 +83,15 @@ public class QuotaCache implements Stoppable {
   private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
     new ConcurrentHashMap<>();
   private final RegionServerServices rsServices;
+  private final String userOverrideRequestAttributeKey;
 
   private QuotaRefresherChore refreshChore;
   private boolean stopped = true;
 
   public QuotaCache(final RegionServerServices rsServices) {
     this.rsServices = rsServices;
+    this.userOverrideRequestAttributeKey =
+      
rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY);
   }
 
   public void start() throws IOException {
@@ -125,7 +137,7 @@ public class QuotaCache implements Stoppable {
    * @return the quota info associated to specified user
    */
   public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
-    return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), 
UserQuotaState::new,
+    return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi), 
UserQuotaState::new,
       this::triggerCacheRefresh);
   }
 
@@ -160,6 +172,28 @@ public class QuotaCache implements Stoppable {
     return exceedThrottleQuotaEnabled;
   }
 
+  /**
+   * Applies a request attribute user override if available, otherwise returns 
the UGI's short
+   * username
+   * @param ugi The request's UserGroupInformation
+   */
+  private String getQuotaUserName(final UserGroupInformation ugi) {
+    if (userOverrideRequestAttributeKey == null) {
+      return ugi.getShortUserName();
+    }
+
+    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
+    if (!rpcCall.isPresent()) {
+      return ugi.getShortUserName();
+    }
+
+    byte[] override = 
rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey);
+    if (override == null) {
+      return ugi.getShortUserName();
+    }
+    return Bytes.toString(override);
+  }
+
   /**
    * Returns the QuotaState requested. If the quota info is not in cache an 
empty one will be
    * returned and the quota request will be enqueued for the next cache 
refresh.
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
index af6c51260fd..35a1757115c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
@@ -771,6 +771,11 @@ public class TestNamedQueueRecorder {
           pair -> pair.getValue().toByteArray()));
       }
 
+      @Override
+      public byte[] getRequestAttribute(String key) {
+        return null;
+      }
+
       @Override
       public int getRemotePort() {
         return 0;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
index 83f788ba151..305f0e29e95 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
@@ -232,6 +232,11 @@ public class TestRegionProcedureStore extends 
RegionProcedureStoreTestBase {
         return null;
       }
 
+      @Override
+      public byte[] getRequestAttribute(String key) {
+        return null;
+      }
+
       @Override
       public int getRemotePort() {
         return 0;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java
new file mode 100644
index 00000000000..75b3cc3ca84
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestQuotaUserOverride {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestQuotaUserOverride.class);
+
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+  private static final byte[] FAMILY = Bytes.toBytes("cf");
+  private static final byte[] QUALIFIER = Bytes.toBytes("q");
+  private static final int NUM_SERVERS = 1;
+  private static final String CUSTOM_OVERRIDE_KEY = "foo";
+
+  private static final TableName TABLE_NAME = 
TableName.valueOf("TestQuotaUserOverride");
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1_000);
+    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 1);
+    
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
+    
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 
500);
+    
TEST_UTIL.getConfiguration().set(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
+      CUSTOM_OVERRIDE_KEY);
+    TEST_UTIL.startMiniCluster(NUM_SERVERS);
+    TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
+    QuotaCache.TEST_FORCE_REFRESH = true;
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    EnvironmentEdgeManager.reset();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testUserGlobalThrottleWithCustomOverride() throws Exception {
+    final Admin admin = TEST_UTIL.getAdmin();
+    final String userOverrideWithQuota = User.getCurrent().getShortName() + 
"123";
+
+    // Add 6req/min limit
+    admin.setQuota(QuotaSettingsFactory.throttleUser(userOverrideWithQuota,
+      ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
+
+    Table tableWithThrottle = 
TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null)
+      .setRequestAttribute(CUSTOM_OVERRIDE_KEY, 
Bytes.toBytes(userOverrideWithQuota)).build();
+    Table tableWithoutThrottle = 
TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null)
+      
.setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
+        Bytes.toBytes(userOverrideWithQuota))
+      .build();
+    Table tableWithoutThrottle2 =
+      TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).build();
+
+    // warm things up
+    doPuts(10, FAMILY, QUALIFIER, tableWithThrottle);
+    doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle);
+    doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2);
+
+    // should reject some requests
+    assertTrue(10 > doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
+    // should accept all puts
+    assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
+    // should accept all puts
+    assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2));
+
+    // Remove all the limits
+    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userOverrideWithQuota));
+    Thread.sleep(60_000);
+    assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
+    assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
+    assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2));
+  }
+
+}

Reply via email to