This is an automated email from the ASF dual-hosted git repository. rmattingly pushed a commit to branch HBASE-28963-branch-2.6 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 9f2dcbc74cc46e7d36c580f1ce62d5a932c65e6b Author: Ray Mattingly <[email protected]> AuthorDate: Wed Nov 13 10:21:38 2024 -0500 HBASE-28963 Updating Quota Factors is too expensive (#6451) Co-authored-by: Ray Mattingly <[email protected]> Signed-off-by: Nick Dimiduk <[email protected]> --- .../org/apache/hadoop/hbase/quotas/QuotaCache.java | 134 ++++++++++++++++++--- .../apache/hadoop/hbase/quotas/UserQuotaState.java | 4 + .../apache/hadoop/hbase/quotas/TestQuotaCache.java | 4 +- .../hadoop/hbase/quotas/TestQuotaUserOverride.java | 15 +-- 4 files changed, 127 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index 9b3498ff894..3999ba643a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.quotas; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; @@ -28,6 +29,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; @@ -48,6 +50,10 @@ import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; +import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; + /** * Cache that keeps track of the quota settings for the users and tables that are interacting with * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will @@ -61,6 +67,10 @@ public class QuotaCache implements Stoppable { private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class); public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; + public static final String TABLE_REGION_STATES_CACHE_TTL_MS = + "hbase.quota.cache.ttl.region.states.ms"; + public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS = + "hbase.quota.cache.ttl.servers.size.ms"; // defines the request attribute key which, when provided, will override the request's username // from the perspective of user quotas @@ -102,7 +112,7 @@ public class QuotaCache implements Stoppable { // TODO: This will be replaced once we have the notification bus ready. Configuration conf = rsServices.getConfiguration(); int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); - refreshChore = new QuotaRefresherChore(period, this); + refreshChore = new QuotaRefresherChore(conf, period, this); rsServices.getChoreService().scheduleChore(refreshChore); } @@ -140,8 +150,7 @@ public class QuotaCache implements Stoppable { */ public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi), - () -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L), - this::triggerCacheRefresh); + () -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L)); } /** @@ -202,7 +211,7 @@ public class QuotaCache implements Stoppable { * returned and the quota request will be enqueued for the next cache refresh. */ private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap, final K key) { - return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh); + return computeIfAbsent(quotasMap, key, QuotaState::new); } void triggerCacheRefresh() { @@ -233,8 +242,33 @@ public class QuotaCache implements Stoppable { private class QuotaRefresherChore extends ScheduledChore { private long lastUpdate = 0; - public QuotaRefresherChore(final int period, final Stoppable stoppable) { + // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability. + // So we cache the results to reduce that load. + private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics; + private final RefreshableExpiringValueCache<Integer> regionServersSize; + + public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) { super("QuotaRefresherChore", stoppable, period); + + Duration tableRegionStatesCacheTtl = + Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period)); + this.tableRegionStatesClusterMetrics = + new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics", + tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin() + .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT))); + + Duration regionServersSizeCacheTtl = + Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period)); + regionServersSize = + new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl, + () -> rsServices.getConnection().getAdmin().getRegionServers().size()); + } + + @Override + public synchronized boolean triggerNow() { + tableRegionStatesClusterMetrics.invalidate(); + regionServersSize.invalidate(); + return super.triggerNow(); } @Override @@ -395,21 +429,40 @@ public class QuotaCache implements Stoppable { * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor. */ private void updateQuotaFactors() { - // Update machine quota factor - ClusterMetrics clusterMetrics; - try { - clusterMetrics = rsServices.getConnection().getAdmin() - .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT)); - } catch (IOException e) { - LOG.warn("Failed to get cluster metrics needed for updating quotas", e); - return; + boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty() + || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters); + if (hasTableQuotas) { + updateTableMachineQuotaFactors(); + } else { + updateOnlyMachineQuotaFactors(); } + } - int rsSize = clusterMetrics.getServersName().size(); - if (rsSize != 0) { - // TODO if use rs group, the cluster limit should be shared by the rs group - machineQuotaFactor = 1.0 / rsSize; + /** + * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if + * we don't have any table quotas in the cache. + */ + private void updateOnlyMachineQuotaFactors() { + Optional<Integer> rsSize = regionServersSize.get(); + if (rsSize.isPresent()) { + updateMachineQuotaFactors(rsSize.get()); + } else { + regionServersSize.refresh(); } + } + + /** + * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine + * factors as well. This relies on a more expensive query for ClusterMetrics. + */ + private void updateTableMachineQuotaFactors() { + Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get(); + if (!clusterMetricsMaybe.isPresent()) { + tableRegionStatesClusterMetrics.refresh(); + return; + } + ClusterMetrics clusterMetrics = clusterMetricsMaybe.get(); + updateMachineQuotaFactors(clusterMetrics.getServersName().size()); Map<TableName, RegionStatesCount> tableRegionStatesCount = clusterMetrics.getTableRegionStatesCount(); @@ -436,6 +489,53 @@ public class QuotaCache implements Stoppable { } } } + + private void updateMachineQuotaFactors(int rsSize) { + if (rsSize != 0) { + // TODO if use rs group, the cluster limit should be shared by the rs group + machineQuotaFactor = 1.0 / rsSize; + } + } + } + + static class RefreshableExpiringValueCache<T> { + private final String name; + private final LoadingCache<String, Optional<T>> cache; + + RefreshableExpiringValueCache(String name, Duration refreshPeriod, + ThrowingSupplier<T> supplier) { + this.name = name; + this.cache = + CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS) + .build(new CacheLoader<>() { + @Override + public Optional<T> load(String key) { + try { + return Optional.of(supplier.get()); + } catch (Exception e) { + LOG.warn("Failed to refresh cache {}", name, e); + return Optional.empty(); + } + } + }); + } + + Optional<T> get() { + return cache.getUnchecked(name); + } + + void refresh() { + cache.refresh(name); + } + + void invalidate() { + cache.invalidate(name); + } + } + + @FunctionalInterface + static interface ThrowingSupplier<T> { + T get() throws Exception; } static interface Fetcher<Key, Value> { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java index 6078c687d3d..a3ec9799436 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java @@ -117,6 +117,10 @@ public class UserQuotaState extends QuotaState { namespaceLimiters = setLimiter(namespaceLimiters, namespace, quotas); } + public boolean hasTableLimiters() { + return tableLimiters != null && !tableLimiters.isEmpty(); + } + private <K> Map<K, QuotaLimiter> setLimiter(Map<K, QuotaLimiter> limiters, final K key, final Quotas quotas) { if (limiters == null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java index 1c431858291..09e15236912 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java @@ -40,7 +40,7 @@ public class TestQuotaCache { HBaseClassTestRule.forClass(TestQuotaCache.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final int REFRESH_TIME = 30_000; + private static final int REFRESH_TIME_MS = 1000; @After public void tearDown() throws Exception { @@ -52,7 +52,7 @@ public class TestQuotaCache { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); - TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME); + TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME_MS); TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_READ_NUM, 1000); TEST_UTIL.startMiniCluster(1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java index 0ba4472ac5b..683d189b761 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java @@ -78,39 +78,32 @@ public class TestQuotaUserOverride { @Test public void testUserGlobalThrottleWithCustomOverride() throws Exception { final Admin admin = TEST_UTIL.getAdmin(); - final String userOverrideWithQuota = User.getCurrent().getShortName() + "123"; + final String userOverrideWithQuota = User.getCurrent().getShortName(); // Add 6req/min limit admin.setQuota(QuotaSettingsFactory.throttleUser(userOverrideWithQuota, ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES)); + ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); Table tableWithThrottle = TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null) .setRequestAttribute(CUSTOM_OVERRIDE_KEY, Bytes.toBytes(userOverrideWithQuota)).build(); Table tableWithoutThrottle = TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null) - .setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY, - Bytes.toBytes(userOverrideWithQuota)) - .build(); - Table tableWithoutThrottle2 = - TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).build(); + .setRequestAttribute(CUSTOM_OVERRIDE_KEY, Bytes.toBytes("anotherUser")).build(); // warm things up doPuts(10, FAMILY, QUALIFIER, tableWithThrottle); doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle); - doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2); // should reject some requests assertTrue(10 > doPuts(10, FAMILY, QUALIFIER, tableWithThrottle)); // should accept all puts assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle)); - // should accept all puts - assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2)); // Remove all the limits admin.setQuota(QuotaSettingsFactory.unthrottleUser(userOverrideWithQuota)); - Thread.sleep(60_000); + ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithThrottle)); assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle)); - assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2)); } }
