This is an automated email from the ASF dual-hosted git repository.

rmattingly pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 3757b63373c HBASE-28963 Updating Quota Factors is too expensive 
(#6451) (#6467)
3757b63373c is described below

commit 3757b63373c8459720ee09cf30c8028e45fa6bdd
Author: Ray Mattingly <[email protected]>
AuthorDate: Wed Nov 13 17:59:55 2024 -0500

    HBASE-28963 Updating Quota Factors is too expensive (#6451) (#6467)
    
    Signed-off-by: Nick Dimiduk <[email protected]>
    Co-authored-by: Ray Mattingly <[email protected]>
---
 .../org/apache/hadoop/hbase/quotas/QuotaCache.java | 134 ++++++++++++++++++---
 .../apache/hadoop/hbase/quotas/UserQuotaState.java |   4 +
 .../apache/hadoop/hbase/quotas/TestQuotaCache.java |   4 +-
 .../hadoop/hbase/quotas/TestQuotaUserOverride.java |  15 +--
 4 files changed, 127 insertions(+), 30 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
index 9b3498ff894..3999ba643a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.quotas;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -28,6 +29,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.ClusterMetrics.Option;
@@ -48,6 +50,10 @@ import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
+
 /**
  * Cache that keeps track of the quota settings for the users and tables that 
are interacting with
  * it. To avoid blocking the operations if the requested quota is not in cache 
an "empty quota" will
@@ -61,6 +67,10 @@ public class QuotaCache implements Stoppable {
   private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
 
   public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
+  public static final String TABLE_REGION_STATES_CACHE_TTL_MS =
+    "hbase.quota.cache.ttl.region.states.ms";
+  public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS =
+    "hbase.quota.cache.ttl.servers.size.ms";
 
   // defines the request attribute key which, when provided, will override the 
request's username
   // from the perspective of user quotas
@@ -102,7 +112,7 @@ public class QuotaCache implements Stoppable {
     // TODO: This will be replaced once we have the notification bus ready.
     Configuration conf = rsServices.getConfiguration();
     int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
-    refreshChore = new QuotaRefresherChore(period, this);
+    refreshChore = new QuotaRefresherChore(conf, period, this);
     rsServices.getChoreService().scheduleChore(refreshChore);
   }
 
@@ -140,8 +150,7 @@ public class QuotaCache implements Stoppable {
    */
   public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
     return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi),
-      () -> 
QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L),
-      this::triggerCacheRefresh);
+      () -> 
QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L));
   }
 
   /**
@@ -202,7 +211,7 @@ public class QuotaCache implements Stoppable {
    * returned and the quota request will be enqueued for the next cache 
refresh.
    */
   private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> 
quotasMap, final K key) {
-    return computeIfAbsent(quotasMap, key, QuotaState::new, 
this::triggerCacheRefresh);
+    return computeIfAbsent(quotasMap, key, QuotaState::new);
   }
 
   void triggerCacheRefresh() {
@@ -233,8 +242,33 @@ public class QuotaCache implements Stoppable {
   private class QuotaRefresherChore extends ScheduledChore {
     private long lastUpdate = 0;
 
-    public QuotaRefresherChore(final int period, final Stoppable stoppable) {
+    // Querying cluster metrics so often, per-RegionServer, limits horizontal 
scalability.
+    // So we cache the results to reduce that load.
+    private final RefreshableExpiringValueCache<ClusterMetrics> 
tableRegionStatesClusterMetrics;
+    private final RefreshableExpiringValueCache<Integer> regionServersSize;
+
+    public QuotaRefresherChore(Configuration conf, final int period, final 
Stoppable stoppable) {
       super("QuotaRefresherChore", stoppable, period);
+
+      Duration tableRegionStatesCacheTtl =
+        Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, 
period));
+      this.tableRegionStatesClusterMetrics =
+        new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics",
+          tableRegionStatesCacheTtl, () -> 
rsServices.getConnection().getAdmin()
+            .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, 
Option.TABLE_TO_REGIONS_COUNT)));
+
+      Duration regionServersSizeCacheTtl =
+        Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, 
period));
+      regionServersSize =
+        new RefreshableExpiringValueCache<>("regionServersSize", 
regionServersSizeCacheTtl,
+          () -> 
rsServices.getConnection().getAdmin().getRegionServers().size());
+    }
+
+    @Override
+    public synchronized boolean triggerNow() {
+      tableRegionStatesClusterMetrics.invalidate();
+      regionServersSize.invalidate();
+      return super.triggerNow();
     }
 
     @Override
@@ -395,21 +429,40 @@ public class QuotaCache implements Stoppable {
      * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] 
as machine factor.
      */
     private void updateQuotaFactors() {
-      // Update machine quota factor
-      ClusterMetrics clusterMetrics;
-      try {
-        clusterMetrics = rsServices.getConnection().getAdmin()
-          .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, 
Option.TABLE_TO_REGIONS_COUNT));
-      } catch (IOException e) {
-        LOG.warn("Failed to get cluster metrics needed for updating quotas", 
e);
-        return;
+      boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty()
+        || 
userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters);
+      if (hasTableQuotas) {
+        updateTableMachineQuotaFactors();
+      } else {
+        updateOnlyMachineQuotaFactors();
       }
+    }
 
-      int rsSize = clusterMetrics.getServersName().size();
-      if (rsSize != 0) {
-        // TODO if use rs group, the cluster limit should be shared by the rs 
group
-        machineQuotaFactor = 1.0 / rsSize;
+    /**
+     * This method is cheaper than {@link #updateTableMachineQuotaFactors()} 
and should be used if
+     * we don't have any table quotas in the cache.
+     */
+    private void updateOnlyMachineQuotaFactors() {
+      Optional<Integer> rsSize = regionServersSize.get();
+      if (rsSize.isPresent()) {
+        updateMachineQuotaFactors(rsSize.get());
+      } else {
+        regionServersSize.refresh();
       }
+    }
+
+    /**
+     * This will call {@link #updateMachineQuotaFactors(int)}, and then update 
the table machine
+     * factors as well. This relies on a more expensive query for 
ClusterMetrics.
+     */
+    private void updateTableMachineQuotaFactors() {
+      Optional<ClusterMetrics> clusterMetricsMaybe = 
tableRegionStatesClusterMetrics.get();
+      if (!clusterMetricsMaybe.isPresent()) {
+        tableRegionStatesClusterMetrics.refresh();
+        return;
+      }
+      ClusterMetrics clusterMetrics = clusterMetricsMaybe.get();
+      updateMachineQuotaFactors(clusterMetrics.getServersName().size());
 
       Map<TableName, RegionStatesCount> tableRegionStatesCount =
         clusterMetrics.getTableRegionStatesCount();
@@ -436,6 +489,53 @@ public class QuotaCache implements Stoppable {
         }
       }
     }
+
+    private void updateMachineQuotaFactors(int rsSize) {
+      if (rsSize != 0) {
+        // TODO if use rs group, the cluster limit should be shared by the rs 
group
+        machineQuotaFactor = 1.0 / rsSize;
+      }
+    }
+  }
+
+  static class RefreshableExpiringValueCache<T> {
+    private final String name;
+    private final LoadingCache<String, Optional<T>> cache;
+
+    RefreshableExpiringValueCache(String name, Duration refreshPeriod,
+      ThrowingSupplier<T> supplier) {
+      this.name = name;
+      this.cache =
+        CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), 
TimeUnit.MILLISECONDS)
+          .build(new CacheLoader<>() {
+            @Override
+            public Optional<T> load(String key) {
+              try {
+                return Optional.of(supplier.get());
+              } catch (Exception e) {
+                LOG.warn("Failed to refresh cache {}", name, e);
+                return Optional.empty();
+              }
+            }
+          });
+    }
+
+    Optional<T> get() {
+      return cache.getUnchecked(name);
+    }
+
+    void refresh() {
+      cache.refresh(name);
+    }
+
+    void invalidate() {
+      cache.invalidate(name);
+    }
+  }
+
+  @FunctionalInterface
+  static interface ThrowingSupplier<T> {
+    T get() throws Exception;
   }
 
   static interface Fetcher<Key, Value> {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
index 6078c687d3d..a3ec9799436 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
@@ -117,6 +117,10 @@ public class UserQuotaState extends QuotaState {
     namespaceLimiters = setLimiter(namespaceLimiters, namespace, quotas);
   }
 
+  public boolean hasTableLimiters() {
+    return tableLimiters != null && !tableLimiters.isEmpty();
+  }
+
   private <K> Map<K, QuotaLimiter> setLimiter(Map<K, QuotaLimiter> limiters, 
final K key,
     final Quotas quotas) {
     if (limiters == null) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java
index 89c77f43b35..9dcf79cd574 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java
@@ -40,7 +40,7 @@ public class TestQuotaCache {
     HBaseClassTestRule.forClass(TestQuotaCache.class);
 
   private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
-  private static final int REFRESH_TIME = 30_000;
+  private static final int REFRESH_TIME_MS = 1000;
 
   @After
   public void tearDown() throws Exception {
@@ -52,7 +52,7 @@ public class TestQuotaCache {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
-    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 
REFRESH_TIME);
+    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 
REFRESH_TIME_MS);
     
TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_READ_NUM,
 1000);
 
     TEST_UTIL.startMiniCluster(1);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java
index 75b3cc3ca84..54931d47122 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java
@@ -78,39 +78,32 @@ public class TestQuotaUserOverride {
   @Test
   public void testUserGlobalThrottleWithCustomOverride() throws Exception {
     final Admin admin = TEST_UTIL.getAdmin();
-    final String userOverrideWithQuota = User.getCurrent().getShortName() + 
"123";
+    final String userOverrideWithQuota = User.getCurrent().getShortName();
 
     // Add 6req/min limit
     admin.setQuota(QuotaSettingsFactory.throttleUser(userOverrideWithQuota,
       ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
+    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, 
TABLE_NAME);
 
     Table tableWithThrottle = 
TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null)
       .setRequestAttribute(CUSTOM_OVERRIDE_KEY, 
Bytes.toBytes(userOverrideWithQuota)).build();
     Table tableWithoutThrottle = 
TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null)
-      
.setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
-        Bytes.toBytes(userOverrideWithQuota))
-      .build();
-    Table tableWithoutThrottle2 =
-      TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).build();
+      .setRequestAttribute(CUSTOM_OVERRIDE_KEY, 
Bytes.toBytes("anotherUser")).build();
 
     // warm things up
     doPuts(10, FAMILY, QUALIFIER, tableWithThrottle);
     doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle);
-    doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2);
 
     // should reject some requests
     assertTrue(10 > doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
     // should accept all puts
     assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
-    // should accept all puts
-    assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2));
 
     // Remove all the limits
     admin.setQuota(QuotaSettingsFactory.unthrottleUser(userOverrideWithQuota));
-    Thread.sleep(60_000);
+    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
     assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
     assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
-    assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2));
   }
 
 }

Reply via email to