This is an automated email from the ASF dual-hosted git repository.
rmattingly pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 3757b63373c HBASE-28963 Updating Quota Factors is too expensive
(#6451) (#6467)
3757b63373c is described below
commit 3757b63373c8459720ee09cf30c8028e45fa6bdd
Author: Ray Mattingly <[email protected]>
AuthorDate: Wed Nov 13 17:59:55 2024 -0500
HBASE-28963 Updating Quota Factors is too expensive (#6451) (#6467)
Signed-off-by: Nick Dimiduk <[email protected]>
Co-authored-by: Ray Mattingly <[email protected]>
---
.../org/apache/hadoop/hbase/quotas/QuotaCache.java | 134 ++++++++++++++++++---
.../apache/hadoop/hbase/quotas/UserQuotaState.java | 4 +
.../apache/hadoop/hbase/quotas/TestQuotaCache.java | 4 +-
.../hadoop/hbase/quotas/TestQuotaUserOverride.java | 15 +--
4 files changed, 127 insertions(+), 30 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
index 9b3498ff894..3999ba643a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.quotas;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -28,6 +29,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
@@ -48,6 +50,10 @@ import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
+
/**
* Cache that keeps track of the quota settings for the users and tables that
are interacting with
* it. To avoid blocking the operations if the requested quota is not in cache
an "empty quota" will
@@ -61,6 +67,10 @@ public class QuotaCache implements Stoppable {
private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
+ public static final String TABLE_REGION_STATES_CACHE_TTL_MS =
+ "hbase.quota.cache.ttl.region.states.ms";
+ public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS =
+ "hbase.quota.cache.ttl.servers.size.ms";
// defines the request attribute key which, when provided, will override the
request's username
// from the perspective of user quotas
@@ -102,7 +112,7 @@ public class QuotaCache implements Stoppable {
// TODO: This will be replaced once we have the notification bus ready.
Configuration conf = rsServices.getConfiguration();
int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
- refreshChore = new QuotaRefresherChore(period, this);
+ refreshChore = new QuotaRefresherChore(conf, period, this);
rsServices.getChoreService().scheduleChore(refreshChore);
}
@@ -140,8 +150,7 @@ public class QuotaCache implements Stoppable {
*/
public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi),
- () ->
QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L),
- this::triggerCacheRefresh);
+ () ->
QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L));
}
/**
@@ -202,7 +211,7 @@ public class QuotaCache implements Stoppable {
* returned and the quota request will be enqueued for the next cache
refresh.
*/
private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState>
quotasMap, final K key) {
- return computeIfAbsent(quotasMap, key, QuotaState::new,
this::triggerCacheRefresh);
+ return computeIfAbsent(quotasMap, key, QuotaState::new);
}
void triggerCacheRefresh() {
@@ -233,8 +242,33 @@ public class QuotaCache implements Stoppable {
private class QuotaRefresherChore extends ScheduledChore {
private long lastUpdate = 0;
- public QuotaRefresherChore(final int period, final Stoppable stoppable) {
+ // Querying cluster metrics so often, per-RegionServer, limits horizontal
scalability.
+ // So we cache the results to reduce that load.
+ private final RefreshableExpiringValueCache<ClusterMetrics>
tableRegionStatesClusterMetrics;
+ private final RefreshableExpiringValueCache<Integer> regionServersSize;
+
+ public QuotaRefresherChore(Configuration conf, final int period, final
Stoppable stoppable) {
super("QuotaRefresherChore", stoppable, period);
+
+ Duration tableRegionStatesCacheTtl =
+ Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS,
period));
+ this.tableRegionStatesClusterMetrics =
+ new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics",
+ tableRegionStatesCacheTtl, () ->
rsServices.getConnection().getAdmin()
+ .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME,
Option.TABLE_TO_REGIONS_COUNT)));
+
+ Duration regionServersSizeCacheTtl =
+ Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS,
period));
+ regionServersSize =
+ new RefreshableExpiringValueCache<>("regionServersSize",
regionServersSizeCacheTtl,
+ () ->
rsServices.getConnection().getAdmin().getRegionServers().size());
+ }
+
+ @Override
+ public synchronized boolean triggerNow() {
+ tableRegionStatesClusterMetrics.invalidate();
+ regionServersSize.invalidate();
+ return super.triggerNow();
}
@Override
@@ -395,21 +429,40 @@ public class QuotaCache implements Stoppable {
* over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum]
as machine factor.
*/
private void updateQuotaFactors() {
- // Update machine quota factor
- ClusterMetrics clusterMetrics;
- try {
- clusterMetrics = rsServices.getConnection().getAdmin()
- .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME,
Option.TABLE_TO_REGIONS_COUNT));
- } catch (IOException e) {
- LOG.warn("Failed to get cluster metrics needed for updating quotas",
e);
- return;
+ boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty()
+ ||
userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters);
+ if (hasTableQuotas) {
+ updateTableMachineQuotaFactors();
+ } else {
+ updateOnlyMachineQuotaFactors();
}
+ }
- int rsSize = clusterMetrics.getServersName().size();
- if (rsSize != 0) {
- // TODO if use rs group, the cluster limit should be shared by the rs
group
- machineQuotaFactor = 1.0 / rsSize;
+ /**
+ * This method is cheaper than {@link #updateTableMachineQuotaFactors()}
and should be used if
+ * we don't have any table quotas in the cache.
+ */
+ private void updateOnlyMachineQuotaFactors() {
+ Optional<Integer> rsSize = regionServersSize.get();
+ if (rsSize.isPresent()) {
+ updateMachineQuotaFactors(rsSize.get());
+ } else {
+ regionServersSize.refresh();
}
+ }
+
+ /**
+ * This will call {@link #updateMachineQuotaFactors(int)}, and then update
the table machine
+ * factors as well. This relies on a more expensive query for
ClusterMetrics.
+ */
+ private void updateTableMachineQuotaFactors() {
+ Optional<ClusterMetrics> clusterMetricsMaybe =
tableRegionStatesClusterMetrics.get();
+ if (!clusterMetricsMaybe.isPresent()) {
+ tableRegionStatesClusterMetrics.refresh();
+ return;
+ }
+ ClusterMetrics clusterMetrics = clusterMetricsMaybe.get();
+ updateMachineQuotaFactors(clusterMetrics.getServersName().size());
Map<TableName, RegionStatesCount> tableRegionStatesCount =
clusterMetrics.getTableRegionStatesCount();
@@ -436,6 +489,53 @@ public class QuotaCache implements Stoppable {
}
}
}
+
+ private void updateMachineQuotaFactors(int rsSize) {
+ if (rsSize != 0) {
+ // TODO if use rs group, the cluster limit should be shared by the rs
group
+ machineQuotaFactor = 1.0 / rsSize;
+ }
+ }
+ }
+
+ static class RefreshableExpiringValueCache<T> {
+ private final String name;
+ private final LoadingCache<String, Optional<T>> cache;
+
+ RefreshableExpiringValueCache(String name, Duration refreshPeriod,
+ ThrowingSupplier<T> supplier) {
+ this.name = name;
+ this.cache =
+ CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(),
TimeUnit.MILLISECONDS)
+ .build(new CacheLoader<>() {
+ @Override
+ public Optional<T> load(String key) {
+ try {
+ return Optional.of(supplier.get());
+ } catch (Exception e) {
+ LOG.warn("Failed to refresh cache {}", name, e);
+ return Optional.empty();
+ }
+ }
+ });
+ }
+
+ Optional<T> get() {
+ return cache.getUnchecked(name);
+ }
+
+ void refresh() {
+ cache.refresh(name);
+ }
+
+ void invalidate() {
+ cache.invalidate(name);
+ }
+ }
+
+ @FunctionalInterface
+ static interface ThrowingSupplier<T> {
+ T get() throws Exception;
}
static interface Fetcher<Key, Value> {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
index 6078c687d3d..a3ec9799436 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
@@ -117,6 +117,10 @@ public class UserQuotaState extends QuotaState {
namespaceLimiters = setLimiter(namespaceLimiters, namespace, quotas);
}
+ public boolean hasTableLimiters() {
+ return tableLimiters != null && !tableLimiters.isEmpty();
+ }
+
private <K> Map<K, QuotaLimiter> setLimiter(Map<K, QuotaLimiter> limiters,
final K key,
final Quotas quotas) {
if (limiters == null) {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java
index 89c77f43b35..9dcf79cd574 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java
@@ -40,7 +40,7 @@ public class TestQuotaCache {
HBaseClassTestRule.forClass(TestQuotaCache.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
- private static final int REFRESH_TIME = 30_000;
+ private static final int REFRESH_TIME_MS = 1000;
@After
public void tearDown() throws Exception {
@@ -52,7 +52,7 @@ public class TestQuotaCache {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
- TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY,
REFRESH_TIME);
+ TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY,
REFRESH_TIME_MS);
TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_READ_NUM,
1000);
TEST_UTIL.startMiniCluster(1);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java
index 75b3cc3ca84..54931d47122 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java
@@ -78,39 +78,32 @@ public class TestQuotaUserOverride {
@Test
public void testUserGlobalThrottleWithCustomOverride() throws Exception {
final Admin admin = TEST_UTIL.getAdmin();
- final String userOverrideWithQuota = User.getCurrent().getShortName() +
"123";
+ final String userOverrideWithQuota = User.getCurrent().getShortName();
// Add 6req/min limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userOverrideWithQuota,
ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
+ ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false,
TABLE_NAME);
Table tableWithThrottle =
TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null)
.setRequestAttribute(CUSTOM_OVERRIDE_KEY,
Bytes.toBytes(userOverrideWithQuota)).build();
Table tableWithoutThrottle =
TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null)
-
.setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
- Bytes.toBytes(userOverrideWithQuota))
- .build();
- Table tableWithoutThrottle2 =
- TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).build();
+ .setRequestAttribute(CUSTOM_OVERRIDE_KEY,
Bytes.toBytes("anotherUser")).build();
// warm things up
doPuts(10, FAMILY, QUALIFIER, tableWithThrottle);
doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle);
- doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2);
// should reject some requests
assertTrue(10 > doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
// should accept all puts
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
- // should accept all puts
- assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2));
// Remove all the limits
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userOverrideWithQuota));
- Thread.sleep(60_000);
+ ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
- assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2));
}
}