This is an automated email from the ASF dual-hosted git repository. cconnell pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push: new d25b13f6c6f HBASE-29573: Fully load QuotaCache instead of reading individual rows on demand (#7282) d25b13f6c6f is described below commit d25b13f6c6f4165eda62935594b625eb3f7da082 Author: Charles Connell <cconn...@apache.org> AuthorDate: Fri Sep 12 10:52:49 2025 -0400 HBASE-29573: Fully load QuotaCache instead of reading individual rows on demand (#7282) Signed-off by: Ray Mattingly <rmattin...@apache.org> --- .../apache/hadoop/hbase/quotas/QuotaTableUtil.java | 31 --- .../org/apache/hadoop/hbase/quotas/QuotaCache.java | 302 ++++++++------------- .../org/apache/hadoop/hbase/quotas/QuotaState.java | 38 +-- .../org/apache/hadoop/hbase/quotas/QuotaUtil.java | 163 +++++------ .../apache/hadoop/hbase/quotas/UserQuotaState.java | 22 +- .../hadoop/hbase/quotas/TestAtomicReadQuota.java | 1 - .../hbase/quotas/TestBlockBytesScannedQuota.java | 1 - .../quotas/TestClusterScopeQuotaThrottle.java | 1 - .../hbase/quotas/TestDefaultAtomicQuota.java | 1 - .../hbase/quotas/TestDefaultHandlerUsageQuota.java | 1 - .../hadoop/hbase/quotas/TestDefaultQuota.java | 7 +- .../apache/hadoop/hbase/quotas/TestQuotaCache.java | 40 +-- .../hadoop/hbase/quotas/TestQuotaCache2.java | 130 +++++++++ .../apache/hadoop/hbase/quotas/TestQuotaState.java | 58 +--- .../hadoop/hbase/quotas/TestQuotaThrottle.java | 1 - .../hadoop/hbase/quotas/TestQuotaUserOverride.java | 1 - .../hbase/quotas/TestThreadHandlerUsageQuota.java | 8 +- 17 files changed, 362 insertions(+), 444 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java index 1afb15c0ac6..4bdf5e5af04 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java @@ -206,37 +206,6 @@ public class QuotaTableUtil { return quotasFromData(result.getValue(QUOTA_FAMILY_INFO, qualifier)); } - public static Get makeGetForTableQuotas(final TableName table) { - Get get = new Get(getTableRowKey(table)); - get.addFamily(QUOTA_FAMILY_INFO); - return get; - } - - public static Get makeGetForNamespaceQuotas(final String namespace) { - Get get = new Get(getNamespaceRowKey(namespace)); - get.addFamily(QUOTA_FAMILY_INFO); - return get; - } - - public static Get makeGetForRegionServerQuotas(final String regionServer) { - Get get = new Get(getRegionServerRowKey(regionServer)); - get.addFamily(QUOTA_FAMILY_INFO); - return get; - } - - public static Get makeGetForUserQuotas(final String user, final Iterable<TableName> tables, - final Iterable<String> namespaces) { - Get get = new Get(getUserRowKey(user)); - get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); - for (final TableName table : tables) { - get.addColumn(QUOTA_FAMILY_INFO, getSettingsQualifierForUserTable(table)); - } - for (final String ns : namespaces) { - get.addColumn(QUOTA_FAMILY_INFO, getSettingsQualifierForUserNamespace(ns)); - } - return get; - } - public static Scan makeScan(final QuotaFilter filter) { Scan scan = new Scan(); scan.addFamily(QUOTA_FAMILY_INFO); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index 2ec9d049f7d..16681eb45f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -19,30 +19,23 @@ package org.apache.hadoop.hbase.quotas; import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; import java.util.EnumSet; -import java.util.List; +import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.RegionStatesCount; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.security.UserGroupInformation; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -73,18 +66,15 @@ public class QuotaCache implements Stoppable { public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY = "hbase.quota.user.override.key"; private static final int REFRESH_DEFAULT_PERIOD = 43_200_000; // 12 hours - private static final int EVICT_PERIOD_FACTOR = 5; - // for testing purpose only, enforce the cache to be always refreshed - static boolean TEST_FORCE_REFRESH = false; - // for testing purpose only, block cache refreshes to reliably verify state - static boolean TEST_BLOCK_REFRESH = false; + private final Object initializerLock = new Object(); + private volatile boolean initialized = false; + + private volatile Map<String, QuotaState> namespaceQuotaCache = new HashMap<>(); + private volatile Map<TableName, QuotaState> tableQuotaCache = new HashMap<>(); + private volatile Map<String, UserQuotaState> userQuotaCache = new HashMap<>(); + private volatile Map<String, QuotaState> regionServerQuotaCache = new HashMap<>(); - private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>(); - private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>(); - private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>(); - private final ConcurrentMap<String, QuotaState> regionServerQuotaCache = - new ConcurrentHashMap<>(); private volatile boolean exceedThrottleQuotaEnabled = false; // factors used to divide cluster scope quota into machine scope quota private volatile double machineQuotaFactor = 1; @@ -96,62 +86,6 @@ public class QuotaCache implements Stoppable { private QuotaRefresherChore refreshChore; private boolean stopped = true; - private final Fetcher<String, UserQuotaState> userQuotaStateFetcher = - new Fetcher<String, UserQuotaState>() { - @Override - public Get makeGet(final String user) { - final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet(); - final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet(); - return QuotaUtil.makeGetForUserQuotas(user, tables, namespaces); - } - - @Override - public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException { - return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets, tableMachineQuotaFactors, - machineQuotaFactor); - } - }; - - private final Fetcher<String, QuotaState> regionServerQuotaStateFetcher = - new Fetcher<String, QuotaState>() { - @Override - public Get makeGet(final String regionServer) { - return QuotaUtil.makeGetForRegionServerQuotas(regionServer); - } - - @Override - public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { - return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets); - } - }; - - private final Fetcher<TableName, QuotaState> tableQuotaStateFetcher = - new Fetcher<TableName, QuotaState>() { - @Override - public Get makeGet(final TableName table) { - return QuotaUtil.makeGetForTableQuotas(table); - } - - @Override - public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException { - return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets, - tableMachineQuotaFactors); - } - }; - - private final Fetcher<String, QuotaState> namespaceQuotaStateFetcher = - new Fetcher<String, QuotaState>() { - @Override - public Get makeGet(final String namespace) { - return QuotaUtil.makeGetForNamespaceQuotas(namespace); - } - - @Override - public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { - return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets, machineQuotaFactor); - } - }; - public QuotaCache(final RegionServerServices rsServices) { this.rsServices = rsServices; this.userOverrideRequestAttributeKey = @@ -163,10 +97,8 @@ public class QuotaCache implements Stoppable { Configuration conf = rsServices.getConfiguration(); // Refresh the cache every 12 hours, and every time a quota is changed, and every time a - // configuration - // reload is triggered. Periodic reloads are kept to a minimum to avoid flooding the - // RegionServer - // holding the hbase:quota table with requests. + // configuration reload is triggered. Periodic reloads are kept to a minimum to avoid + // flooding the RegionServer holding the hbase:quota table with requests. int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); refreshChore = new QuotaRefresherChore(conf, period, this); rsServices.getChoreService().scheduleChore(refreshChore); @@ -186,6 +118,34 @@ public class QuotaCache implements Stoppable { return stopped; } + private void ensureInitialized() { + if (!initialized) { + synchronized (initializerLock) { + if (!initialized) { + refreshChore.chore(); + initialized = true; + } + } + } + } + + private Map<String, UserQuotaState> fetchUserQuotaStateEntries() throws IOException { + return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), tableMachineQuotaFactors, + machineQuotaFactor); + } + + private Map<String, QuotaState> fetchRegionServerQuotaStateEntries() throws IOException { + return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection()); + } + + private Map<TableName, QuotaState> fetchTableQuotaStateEntries() throws IOException { + return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), tableMachineQuotaFactors); + } + + private Map<String, QuotaState> fetchNamespaceQuotaStateEntries() throws IOException { + return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), machineQuotaFactor); + } + /** * Returns the limiter associated to the specified user/table. * @param ugi the user to limit @@ -206,12 +166,13 @@ public class QuotaCache implements Stoppable { */ public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { String user = getQuotaUserName(ugi); - if (!userQuotaCache.containsKey(user)) { - userQuotaCache.put(user, - QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L)); - fetch("user", userQuotaCache, userQuotaStateFetcher); + ensureInitialized(); + // local reference because the chore thread may assign to userQuotaCache + Map<String, UserQuotaState> cache = userQuotaCache; + if (!cache.containsKey(user)) { + cache.put(user, QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration())); } - return userQuotaCache.get(user); + return cache.get(user); } /** @@ -220,11 +181,13 @@ public class QuotaCache implements Stoppable { * @return the limiter associated to the specified table */ public QuotaLimiter getTableLimiter(final TableName table) { - if (!tableQuotaCache.containsKey(table)) { - tableQuotaCache.put(table, new QuotaState()); - fetch("table", tableQuotaCache, tableQuotaStateFetcher); + ensureInitialized(); + // local reference because the chore thread may assign to tableQuotaCache + Map<TableName, QuotaState> cache = tableQuotaCache; + if (!cache.containsKey(table)) { + cache.put(table, new QuotaState()); } - return tableQuotaCache.get(table).getGlobalLimiter(); + return cache.get(table).getGlobalLimiter(); } /** @@ -233,11 +196,13 @@ public class QuotaCache implements Stoppable { * @return the limiter associated to the specified namespace */ public QuotaLimiter getNamespaceLimiter(final String namespace) { - if (!namespaceQuotaCache.containsKey(namespace)) { - namespaceQuotaCache.put(namespace, new QuotaState()); - fetch("namespace", namespaceQuotaCache, namespaceQuotaStateFetcher); + ensureInitialized(); + // local reference because the chore thread may assign to namespaceQuotaCache + Map<String, QuotaState> cache = namespaceQuotaCache; + if (!cache.containsKey(namespace)) { + cache.put(namespace, new QuotaState()); } - return namespaceQuotaCache.get(namespace).getGlobalLimiter(); + return cache.get(namespace).getGlobalLimiter(); } /** @@ -246,41 +211,19 @@ public class QuotaCache implements Stoppable { * @return the limiter associated to the specified region server */ public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) { - if (!regionServerQuotaCache.containsKey(regionServer)) { - regionServerQuotaCache.put(regionServer, new QuotaState()); - fetch("regionServer", regionServerQuotaCache, regionServerQuotaStateFetcher); + ensureInitialized(); + // local reference because the chore thread may assign to regionServerQuotaCache + Map<String, QuotaState> cache = regionServerQuotaCache; + if (!cache.containsKey(regionServer)) { + cache.put(regionServer, new QuotaState()); } - return regionServerQuotaCache.get(regionServer).getGlobalLimiter(); + return cache.get(regionServer).getGlobalLimiter(); } protected boolean isExceedThrottleQuotaEnabled() { return exceedThrottleQuotaEnabled; } - private <K, V extends QuotaState> void fetch(final String type, final Map<K, V> quotasMap, - final Fetcher<K, V> fetcher) { - // Find the quota entries to update - List<Get> gets = quotasMap.keySet().stream().map(fetcher::makeGet).collect(Collectors.toList()); - - // fetch and update the quota entries - if (!gets.isEmpty()) { - try { - for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) { - V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); - if (quotaInfo != null) { - quotaInfo.update(entry.getValue()); - } - - if (LOG.isTraceEnabled()) { - LOG.trace("Loading {} key={} quotas={}", type, entry.getKey(), quotaInfo); - } - } - } catch (IOException e) { - LOG.warn("Unable to read {} from quota table", type, e); - } - } - } - /** * Applies a request attribute user override if available, otherwise returns the UGI's short * username @@ -311,18 +254,22 @@ public class QuotaCache implements Stoppable { refreshChore.chore(); } + /** visible for testing */ Map<String, QuotaState> getNamespaceQuotaCache() { return namespaceQuotaCache; } + /** visible for testing */ Map<String, QuotaState> getRegionServerQuotaCache() { return regionServerQuotaCache; } + /** visible for testing */ Map<TableName, QuotaState> getTableQuotaCache() { return tableQuotaCache; } + /** visible for testing */ Map<String, UserQuotaState> getUserQuotaCache() { return userQuotaCache; } @@ -359,38 +306,44 @@ public class QuotaCache implements Stoppable { } @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES", - justification = "I do not understand why the complaints, it looks good to me -- FIX") protected void chore() { - while (TEST_BLOCK_REFRESH) { - LOG.info("TEST_BLOCK_REFRESH=true, so blocking QuotaCache refresh until it is false"); - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + updateQuotaFactors(); + + try { + Map<String, UserQuotaState> newUserQuotaCache = new HashMap<>(fetchUserQuotaStateEntries()); + updateNewCacheFromOld(userQuotaCache, newUserQuotaCache); + userQuotaCache = newUserQuotaCache; + } catch (IOException e) { + LOG.error("Error while fetching user quotas", e); } - // Prefetch online tables/namespaces - for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) { - if (table.isSystemTable()) { - continue; - } - QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState()); - final String ns = table.getNamespaceAsString(); + try { + Map<String, QuotaState> newRegionServerQuotaCache = + new HashMap<>(fetchRegionServerQuotaStateEntries()); + updateNewCacheFromOld(regionServerQuotaCache, newRegionServerQuotaCache); + regionServerQuotaCache = newRegionServerQuotaCache; + } catch (IOException e) { + LOG.error("Error while fetching region server quotas", e); + } - QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState()); + try { + Map<TableName, QuotaState> newTableQuotaCache = + new HashMap<>(fetchTableQuotaStateEntries()); + updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache); + tableQuotaCache = newTableQuotaCache; + } catch (IOException e) { + LOG.error("Error while refreshing table quotas", e); } - QuotaCache.this.regionServerQuotaCache - .computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState()); + try { + Map<String, QuotaState> newNamespaceQuotaCache = + new HashMap<>(fetchNamespaceQuotaStateEntries()); + updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache); + namespaceQuotaCache = newNamespaceQuotaCache; + } catch (IOException e) { + LOG.error("Error while refreshing namespace quotas", e); + } - updateQuotaFactors(); - fetchAndEvict("namespace", QuotaCache.this.namespaceQuotaCache, namespaceQuotaStateFetcher); - fetchAndEvict("table", QuotaCache.this.tableQuotaCache, tableQuotaStateFetcher); - fetchAndEvict("user", QuotaCache.this.userQuotaCache, userQuotaStateFetcher); - fetchAndEvict("regionServer", QuotaCache.this.regionServerQuotaCache, - regionServerQuotaStateFetcher); fetchExceedThrottleQuota(); } @@ -403,48 +356,6 @@ public class QuotaCache implements Stoppable { } } - private <K, V extends QuotaState> void fetchAndEvict(final String type, - final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) { - long now = EnvironmentEdgeManager.currentTime(); - long evictPeriod = getPeriod() * EVICT_PERIOD_FACTOR; - // Find the quota entries to update - List<Get> gets = new ArrayList<>(); - List<K> toRemove = new ArrayList<>(); - for (Map.Entry<K, V> entry : quotasMap.entrySet()) { - long lastQuery = entry.getValue().getLastQuery(); - if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) { - toRemove.add(entry.getKey()); - } else { - gets.add(fetcher.makeGet(entry.getKey())); - } - } - - for (final K key : toRemove) { - if (LOG.isTraceEnabled()) { - LOG.trace("evict " + type + " key=" + key); - } - quotasMap.remove(key); - } - - // fetch and update the quota entries - if (!gets.isEmpty()) { - try { - for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) { - V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); - if (quotaInfo != null) { - quotaInfo.update(entry.getValue()); - } - - if (LOG.isTraceEnabled()) { - LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo); - } - } - } catch (IOException e) { - LOG.warn("Unable to read " + type + " from quota table", e); - } - } - } - /** * Update quota factors which is used to divide cluster scope quota into machine scope quota For * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user @@ -520,6 +431,20 @@ public class QuotaCache implements Stoppable { } } + /** visible for testing */ + static <K, V extends QuotaState> void updateNewCacheFromOld(Map<K, V> oldCache, + Map<K, V> newCache) { + for (Map.Entry<K, V> entry : oldCache.entrySet()) { + K key = entry.getKey(); + if (newCache.containsKey(key)) { + V newState = newCache.get(key); + V oldState = entry.getValue(); + oldState.update(newState); + newCache.put(key, oldState); + } + } + } + static class RefreshableExpiringValueCache<T> { private final String name; private final LoadingCache<String, Optional<T>> cache; @@ -560,9 +485,4 @@ public class QuotaCache implements Stoppable { T get() throws Exception; } - interface Fetcher<Key, Value> { - Get makeGet(Key key); - - Map<Key, Value> fetchEntries(List<Get> gets) throws IOException; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java index 7c9445e1558..61aa9d7f068 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.quotas; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -32,33 +31,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; justification = "FindBugs seems confused; says globalLimiter and lastUpdate " + "are mostly synchronized...but to me it looks like they are totally synchronized") public class QuotaState { - protected long lastUpdate = 0; - protected long lastQuery = 0; - protected QuotaLimiter globalLimiter = NoopQuotaLimiter.get(); - public QuotaState() { - this(0); - } - - public QuotaState(final long updateTs) { - lastUpdate = updateTs; - } - - public synchronized long getLastUpdate() { - return lastUpdate; - } - - public synchronized long getLastQuery() { - return lastQuery; - } - @Override public synchronized String toString() { StringBuilder builder = new StringBuilder(); - builder.append("QuotaState(ts=" + getLastUpdate()); + builder.append("QuotaState("); if (isBypass()) { - builder.append(" bypass"); + builder.append("bypass"); } else { if (globalLimiter != NoopQuotaLimiter.get()) { // builder.append(" global-limiter"); @@ -85,6 +65,11 @@ public class QuotaState { } } + /** visible for testing */ + void setGlobalLimiter(QuotaLimiter globalLimiter) { + this.globalLimiter = globalLimiter; + } + /** * Perform an update of the quota info based on the other quota info object. (This operation is * executed by the QuotaCache) @@ -97,7 +82,6 @@ public class QuotaState { } else { globalLimiter = QuotaLimiterFactory.update(globalLimiter, other.globalLimiter); } - lastUpdate = other.lastUpdate; } /** @@ -105,15 +89,7 @@ public class QuotaState { * @return the quota limiter */ public synchronized QuotaLimiter getGlobalLimiter() { - lastQuery = EnvironmentEdgeManager.currentTime(); return globalLimiter; } - /** - * Return the limiter associated with this quota without updating internal last query stats - * @return the quota limiter - */ - synchronized QuotaLimiter getGlobalLimiterWithoutUpdatingLastQuery() { - return globalLimiter; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java index 3ef704b666b..6b38635eccc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java @@ -39,10 +39,11 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -329,59 +330,56 @@ public class QuotaUtil extends QuotaTableUtil { } public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection, - final List<Get> gets, Map<TableName, Double> tableMachineQuotaFactors, double factor) - throws IOException { - long nowTs = EnvironmentEdgeManager.currentTime(); - Result[] results = doGet(connection, gets); - - Map<String, UserQuotaState> userQuotas = new HashMap<>(results.length); - for (int i = 0; i < results.length; ++i) { - byte[] key = gets.get(i).getRow(); - assert isUserRowKey(key); - String user = getUserFromRowKey(key); - - if (results[i].isEmpty()) { - userQuotas.put(user, buildDefaultUserQuotaState(connection.getConfiguration(), nowTs)); - continue; - } - - final UserQuotaState quotaInfo = new UserQuotaState(nowTs); - userQuotas.put(user, quotaInfo); - - assert Bytes.equals(key, results[i].getRow()); - - try { - parseUserResult(user, results[i], new UserQuotasVisitor() { - @Override - public void visitUserQuotas(String userName, String namespace, Quotas quotas) { - quotas = updateClusterQuotaToMachineQuota(quotas, factor); - quotaInfo.setQuotas(namespace, quotas); + Map<TableName, Double> tableMachineQuotaFactors, double factor) throws IOException { + Map<String, UserQuotaState> userQuotas = new HashMap<>(); + try (Table table = connection.getTable(QUOTA_TABLE_NAME)) { + Scan scan = new Scan(); + scan.addFamily(QUOTA_FAMILY_INFO); + scan.setStartStopRowForPrefixScan(QUOTA_USER_ROW_KEY_PREFIX); + try (ResultScanner resultScanner = table.getScanner(scan)) { + for (Result result : resultScanner) { + byte[] key = result.getRow(); + assert isUserRowKey(key); + String user = getUserFromRowKey(key); + + final UserQuotaState quotaInfo = new UserQuotaState(); + userQuotas.put(user, quotaInfo); + + try { + parseUserResult(user, result, new UserQuotasVisitor() { + @Override + public void visitUserQuotas(String userName, String namespace, Quotas quotas) { + quotas = updateClusterQuotaToMachineQuota(quotas, factor); + quotaInfo.setQuotas(namespace, quotas); + } + + @Override + public void visitUserQuotas(String userName, TableName table, Quotas quotas) { + quotas = updateClusterQuotaToMachineQuota(quotas, + tableMachineQuotaFactors.containsKey(table) + ? tableMachineQuotaFactors.get(table) + : 1); + quotaInfo.setQuotas(table, quotas); + } + + @Override + public void visitUserQuotas(String userName, Quotas quotas) { + quotas = updateClusterQuotaToMachineQuota(quotas, factor); + quotaInfo.setQuotas(quotas); + } + }); + } catch (IOException e) { + LOG.error("Unable to parse user '" + user + "' quotas", e); + userQuotas.remove(user); } - - @Override - public void visitUserQuotas(String userName, TableName table, Quotas quotas) { - quotas = updateClusterQuotaToMachineQuota(quotas, - tableMachineQuotaFactors.containsKey(table) - ? tableMachineQuotaFactors.get(table) - : 1); - quotaInfo.setQuotas(table, quotas); - } - - @Override - public void visitUserQuotas(String userName, Quotas quotas) { - quotas = updateClusterQuotaToMachineQuota(quotas, factor); - quotaInfo.setQuotas(quotas); - } - }); - } catch (IOException e) { - LOG.error("Unable to parse user '" + user + "' quotas", e); - userQuotas.remove(user); + } } } + return userQuotas; } - protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf, long nowTs) { + protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf) { QuotaProtos.Throttle.Builder throttleBuilder = QuotaProtos.Throttle.newBuilder(); buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_NUM) @@ -405,7 +403,7 @@ public class QuotaUtil extends QuotaTableUtil { buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS) .ifPresent(throttleBuilder::setReqHandlerUsageMs); - UserQuotaState state = new UserQuotaState(nowTs); + UserQuotaState state = new UserQuotaState(); QuotaProtos.Quotas defaultQuotas = QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build(); state.setQuotas(defaultQuotas); @@ -422,8 +420,11 @@ public class QuotaUtil extends QuotaTableUtil { } public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection, - final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException { - return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() { + Map<TableName, Double> tableMachineFactors) throws IOException { + Scan scan = new Scan(); + scan.addFamily(QUOTA_FAMILY_INFO); + scan.setStartStopRowForPrefixScan(QUOTA_TABLE_ROW_KEY_PREFIX); + return fetchGlobalQuotas("table", scan, connection, new KeyFromRow<TableName>() { @Override public TableName getKeyFromRow(final byte[] row) { assert isTableRowKey(row); @@ -438,8 +439,11 @@ public class QuotaUtil extends QuotaTableUtil { } public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection, - final List<Get> gets, double factor) throws IOException { - return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() { + double factor) throws IOException { + Scan scan = new Scan(); + scan.addFamily(QUOTA_FAMILY_INFO); + scan.setStartStopRowForPrefixScan(QUOTA_NAMESPACE_ROW_KEY_PREFIX); + return fetchGlobalQuotas("namespace", scan, connection, new KeyFromRow<String>() { @Override public String getKeyFromRow(final byte[] row) { assert isNamespaceRowKey(row); @@ -453,9 +457,12 @@ public class QuotaUtil extends QuotaTableUtil { }); } - public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection, - final List<Get> gets) throws IOException { - return fetchGlobalQuotas("regionServer", connection, gets, new KeyFromRow<String>() { + public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection) + throws IOException { + Scan scan = new Scan(); + scan.addFamily(QUOTA_FAMILY_INFO); + scan.setStartStopRowForPrefixScan(QUOTA_REGION_SERVER_ROW_KEY_PREFIX); + return fetchGlobalQuotas("regionServer", scan, connection, new KeyFromRow<String>() { @Override public String getKeyFromRow(final byte[] row) { assert isRegionServerRowKey(row); @@ -469,32 +476,34 @@ public class QuotaUtil extends QuotaTableUtil { }); } - public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, - final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr) throws IOException { - long nowTs = EnvironmentEdgeManager.currentTime(); - Result[] results = doGet(connection, gets); + public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, final Scan scan, + final Connection connection, final KeyFromRow<K> kfr) throws IOException { - Map<K, QuotaState> globalQuotas = new HashMap<>(results.length); - for (int i = 0; i < results.length; ++i) { - byte[] row = gets.get(i).getRow(); - K key = kfr.getKeyFromRow(row); + Map<K, QuotaState> globalQuotas = new HashMap<>(); + try (Table table = connection.getTable(QUOTA_TABLE_NAME)) { + try (ResultScanner resultScanner = table.getScanner(scan)) { + for (Result result : resultScanner) { - QuotaState quotaInfo = new QuotaState(nowTs); - globalQuotas.put(key, quotaInfo); + byte[] row = result.getRow(); + K key = kfr.getKeyFromRow(row); - if (results[i].isEmpty()) continue; - assert Bytes.equals(row, results[i].getRow()); + QuotaState quotaInfo = new QuotaState(); + globalQuotas.put(key, quotaInfo); - byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); - if (data == null) continue; + byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); + if (data == null) { + continue; + } - try { - Quotas quotas = quotasFromData(data); - quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key)); - quotaInfo.setQuotas(quotas); - } catch (IOException e) { - LOG.error("Unable to parse " + type + " '" + key + "' quotas", e); - globalQuotas.remove(key); + try { + Quotas quotas = quotasFromData(data); + quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key)); + quotaInfo.setQuotas(quotas); + } catch (IOException e) { + LOG.error("Unable to parse {} '{}' quotas", type, key, e); + globalQuotas.remove(key); + } + } } } return globalQuotas; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java index a3ec9799436..877ad195c71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java @@ -22,7 +22,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -42,24 +41,18 @@ public class UserQuotaState extends QuotaState { private Map<TableName, QuotaLimiter> tableLimiters = null; private boolean bypassGlobals = false; - public UserQuotaState() { - super(); - } - - public UserQuotaState(final long updateTs) { - super(updateTs); - } - @Override public synchronized String toString() { StringBuilder builder = new StringBuilder(); - builder.append("UserQuotaState(ts=" + getLastUpdate()); - if (bypassGlobals) builder.append(" bypass-globals"); + builder.append("UserQuotaState("); + if (bypassGlobals) { + builder.append("bypass-globals"); + } if (isBypass()) { builder.append(" bypass"); } else { - if (getGlobalLimiterWithoutUpdatingLastQuery() != NoopQuotaLimiter.get()) { + if (getGlobalLimiter() != NoopQuotaLimiter.get()) { builder.append(" global-limiter"); } @@ -86,7 +79,7 @@ public class UserQuotaState extends QuotaState { /** Returns true if there is no quota information associated to this object */ @Override public synchronized boolean isBypass() { - return !bypassGlobals && getGlobalLimiterWithoutUpdatingLastQuery() == NoopQuotaLimiter.get() + return !bypassGlobals && getGlobalLimiter() == NoopQuotaLimiter.get() && (tableLimiters == null || tableLimiters.isEmpty()) && (namespaceLimiters == null || namespaceLimiters.isEmpty()); } @@ -191,7 +184,6 @@ public class UserQuotaState extends QuotaState { * @return the quota limiter for the specified table */ public synchronized QuotaLimiter getTableLimiter(final TableName table) { - lastQuery = EnvironmentEdgeManager.currentTime(); if (tableLimiters != null) { QuotaLimiter limiter = tableLimiters.get(table); if (limiter != null) return limiter; @@ -200,6 +192,6 @@ public class UserQuotaState extends QuotaState { QuotaLimiter limiter = namespaceLimiters.get(table.getNamespaceAsString()); if (limiter != null) return limiter; } - return getGlobalLimiterWithoutUpdatingLastQuery(); + return getGlobalLimiter(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java index 12bbc26d364..8a001d02fef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java @@ -78,7 +78,6 @@ public class TestAtomicReadQuota { TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); - QuotaCache.TEST_FORCE_REFRESH = true; } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java index de3600b9ee9..851aeae3164 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java @@ -84,7 +84,6 @@ public class TestBlockBytesScannedQuota { TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); - QuotaCache.TEST_FORCE_REFRESH = true; } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestClusterScopeQuotaThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestClusterScopeQuotaThrottle.java index 7d0c566f02b..c881a2e8888 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestClusterScopeQuotaThrottle.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestClusterScopeQuotaThrottle.java @@ -77,7 +77,6 @@ public class TestClusterScopeQuotaThrottle { TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); TEST_UTIL.startMiniCluster(2); TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); - QuotaCache.TEST_FORCE_REFRESH = true; tables = new Table[TABLE_NAMES.length]; for (int i = 0; i < TABLE_NAMES.length; ++i) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultAtomicQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultAtomicQuota.java index 31840cb8d2f..7074ae79b7c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultAtomicQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultAtomicQuota.java @@ -75,7 +75,6 @@ public class TestDefaultAtomicQuota { TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); - QuotaCache.TEST_FORCE_REFRESH = true; TEST_UTIL.flush(TABLE_NAME); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultHandlerUsageQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultHandlerUsageQuota.java index abd60460e1f..09e19395c3a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultHandlerUsageQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultHandlerUsageQuota.java @@ -72,7 +72,6 @@ public class TestDefaultHandlerUsageQuota { TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); - QuotaCache.TEST_FORCE_REFRESH = true; TEST_UTIL.flush(TABLE_NAME); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultQuota.java index 7966a7bc447..96ecbe44569 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultQuota.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.junit.After; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -59,8 +59,8 @@ public class TestDefaultQuota { TEST_UTIL.shutdownMiniCluster(); } - @BeforeClass - public static void setUpBeforeClass() throws Exception { + @Before + public void setUp() throws Exception { // quotas enabled, using block bytes scanned TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME); @@ -73,7 +73,6 @@ public class TestDefaultQuota { TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); - QuotaCache.TEST_FORCE_REFRESH = true; try (Admin admin = TEST_UTIL.getAdmin()) { ThrottleQuotaTestUtil.doPuts(1_000, FAMILY, QUALIFIER, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java index f4f876f104c..fa07ab5345d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.quotas; -import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota; import static org.junit.Assert.assertEquals; import java.util.concurrent.TimeUnit; @@ -29,8 +28,8 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.security.UserGroupInformation; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -45,15 +44,15 @@ public class TestQuotaCache { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final int REFRESH_TIME_MS = 1000; - @AfterClass - public static void tearDown() throws Exception { + @After + public void tearDown() throws Exception { ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); EnvironmentEdgeManager.reset(); TEST_UTIL.shutdownMiniCluster(); } - @BeforeClass - public static void setUpBeforeClass() throws Exception { + @Before + public void setUp() throws Exception { TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME_MS); TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_READ_NUM, 1000); @@ -62,33 +61,6 @@ public class TestQuotaCache { TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); } - @Test - public void testDefaultUserRefreshFrequency() throws Exception { - QuotaCache.TEST_BLOCK_REFRESH = true; - - QuotaCache quotaCache = - ThrottleQuotaTestUtil.getQuotaCaches(TEST_UTIL).stream().findAny().get(); - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - - UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); - - QuotaCache.TEST_BLOCK_REFRESH = false; - // new user should have refreshed immediately - TEST_UTIL.waitFor(5_000, () -> userQuotaState.getLastUpdate() != 0); - long lastUpdate = userQuotaState.getLastUpdate(); - - // refresh should not apply to recently refreshed quota - quotaCache.triggerCacheRefresh(); - Thread.sleep(250); - long newLastUpdate = userQuotaState.getLastUpdate(); - assertEquals(lastUpdate, newLastUpdate); - - quotaCache.triggerCacheRefresh(); - waitMinuteQuota(); - // should refresh after time has passed - TEST_UTIL.waitFor(5_000, () -> lastUpdate != userQuotaState.getLastUpdate()); - } - @Test public void testUserQuotaLookup() throws Exception { QuotaCache quotaCache = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java new file mode 100644 index 00000000000..2c33b265771 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; + +/** + * Tests of QuotaCache that don't require a minicluster, unlike in TestQuotaCache + */ +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestQuotaCache2 { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestQuotaCache2.class); + + @Test + public void testPreserveLimiterAvailability() throws Exception { + // establish old cache with a limiter for 100 read bytes per second + QuotaState oldState = new QuotaState(); + Map<String, QuotaState> oldCache = new HashMap<>(); + oldCache.put("my_table", oldState); + QuotaProtos.Throttle throttle1 = QuotaProtos.Throttle.newBuilder() + .setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS) + .setSoftLimit(100).setScope(QuotaProtos.QuotaScope.MACHINE).build()) + .build(); + QuotaLimiter limiter1 = TimeBasedLimiter.fromThrottle(throttle1); + oldState.setGlobalLimiter(limiter1); + + // consume one byte from the limiter, so 99 will be left + limiter1.consumeRead(1, 1, false); + + // establish new cache, also with a limiter for 100 read bytes per second + QuotaState newState = new QuotaState(); + Map<String, QuotaState> newCache = new HashMap<>(); + newCache.put("my_table", newState); + QuotaProtos.Throttle throttle2 = QuotaProtos.Throttle.newBuilder() + .setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS) + .setSoftLimit(100).setScope(QuotaProtos.QuotaScope.MACHINE).build()) + .build(); + QuotaLimiter limiter2 = TimeBasedLimiter.fromThrottle(throttle2); + newState.setGlobalLimiter(limiter2); + + // update new cache from old cache + QuotaCache.updateNewCacheFromOld(oldCache, newCache); + + // verify that the 99 available bytes from the limiter was carried over + TimeBasedLimiter updatedLimiter = + (TimeBasedLimiter) newCache.get("my_table").getGlobalLimiter(); + assertEquals(99, updatedLimiter.getReadAvailable()); + } + + @Test + public void testClobberLimiterLimit() throws Exception { + // establish old cache with a limiter for 100 read bytes per second + QuotaState oldState = new QuotaState(); + Map<String, QuotaState> oldCache = new HashMap<>(); + oldCache.put("my_table", oldState); + QuotaProtos.Throttle throttle1 = QuotaProtos.Throttle.newBuilder() + .setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS) + .setSoftLimit(100).setScope(QuotaProtos.QuotaScope.MACHINE).build()) + .build(); + QuotaLimiter limiter1 = TimeBasedLimiter.fromThrottle(throttle1); + oldState.setGlobalLimiter(limiter1); + + // establish new cache, also with a limiter for 100 read bytes per second + QuotaState newState = new QuotaState(); + Map<String, QuotaState> newCache = new HashMap<>(); + newCache.put("my_table", newState); + QuotaProtos.Throttle throttle2 = QuotaProtos.Throttle.newBuilder() + .setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS) + .setSoftLimit(50).setScope(QuotaProtos.QuotaScope.MACHINE).build()) + .build(); + QuotaLimiter limiter2 = TimeBasedLimiter.fromThrottle(throttle2); + newState.setGlobalLimiter(limiter2); + + // update new cache from old cache + QuotaCache.updateNewCacheFromOld(oldCache, newCache); + + // verify that the 99 available bytes from the limiter was carried over + TimeBasedLimiter updatedLimiter = + (TimeBasedLimiter) newCache.get("my_table").getGlobalLimiter(); + assertEquals(50, updatedLimiter.getReadLimit()); + } + + @Test + public void testForgetsDeletedQuota() { + QuotaState oldState = new QuotaState(); + Map<String, QuotaState> oldCache = new HashMap<>(); + oldCache.put("my_table1", oldState); + + QuotaState newState = new QuotaState(); + Map<String, QuotaState> newCache = new HashMap<>(); + newCache.put("my_table2", newState); + + QuotaCache.updateNewCacheFromOld(oldCache, newCache); + + assertTrue(newCache.containsKey("my_table2")); + assertFalse(newCache.containsKey("my_table1")); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java index 59b26f3f0d9..ff4b6bc9949 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.quotas; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -81,67 +80,38 @@ public class TestQuotaState { assertThrottleException(quotaInfo.getTableLimiter(tableName), NUM_TABLE_THROTTLE); } - @Test - public void testQuotaStateUpdateBypassThrottle() { - final long LAST_UPDATE = 10; - - UserQuotaState quotaInfo = new UserQuotaState(); - assertEquals(0, quotaInfo.getLastUpdate()); - assertTrue(quotaInfo.isBypass()); - - UserQuotaState otherQuotaState = new UserQuotaState(LAST_UPDATE); - assertEquals(LAST_UPDATE, otherQuotaState.getLastUpdate()); - assertTrue(otherQuotaState.isBypass()); - - quotaInfo.update(otherQuotaState); - assertEquals(LAST_UPDATE, quotaInfo.getLastUpdate()); - assertTrue(quotaInfo.isBypass()); - assertTrue(quotaInfo.getGlobalLimiter() == quotaInfo.getTableLimiter(UNKNOWN_TABLE_NAME)); - assertNoopLimiter(quotaInfo.getTableLimiter(UNKNOWN_TABLE_NAME)); - } - @Test public void testQuotaStateUpdateGlobalThrottle() { final int NUM_GLOBAL_THROTTLE_1 = 3; final int NUM_GLOBAL_THROTTLE_2 = 11; - final long LAST_UPDATE_1 = 10; - final long LAST_UPDATE_2 = 20; - final long LAST_UPDATE_3 = 30; QuotaState quotaInfo = new QuotaState(); - assertEquals(0, quotaInfo.getLastUpdate()); assertTrue(quotaInfo.isBypass()); // Add global throttle - QuotaState otherQuotaState = new QuotaState(LAST_UPDATE_1); + QuotaState otherQuotaState = new QuotaState(); otherQuotaState.setQuotas(buildReqNumThrottle(NUM_GLOBAL_THROTTLE_1)); - assertEquals(LAST_UPDATE_1, otherQuotaState.getLastUpdate()); assertFalse(otherQuotaState.isBypass()); quotaInfo.update(otherQuotaState); - assertEquals(LAST_UPDATE_1, quotaInfo.getLastUpdate()); assertFalse(quotaInfo.isBypass()); assertThrottleException(quotaInfo.getGlobalLimiter(), NUM_GLOBAL_THROTTLE_1); // Update global Throttle - otherQuotaState = new QuotaState(LAST_UPDATE_2); + otherQuotaState = new QuotaState(); otherQuotaState.setQuotas(buildReqNumThrottle(NUM_GLOBAL_THROTTLE_2)); - assertEquals(LAST_UPDATE_2, otherQuotaState.getLastUpdate()); assertFalse(otherQuotaState.isBypass()); quotaInfo.update(otherQuotaState); - assertEquals(LAST_UPDATE_2, quotaInfo.getLastUpdate()); assertFalse(quotaInfo.isBypass()); assertThrottleException(quotaInfo.getGlobalLimiter(), NUM_GLOBAL_THROTTLE_2 - NUM_GLOBAL_THROTTLE_1); // Remove global throttle - otherQuotaState = new QuotaState(LAST_UPDATE_3); - assertEquals(LAST_UPDATE_3, otherQuotaState.getLastUpdate()); + otherQuotaState = new QuotaState(); assertTrue(otherQuotaState.isBypass()); quotaInfo.update(otherQuotaState); - assertEquals(LAST_UPDATE_3, quotaInfo.getLastUpdate()); assertTrue(quotaInfo.isBypass()); assertNoopLimiter(quotaInfo.getGlobalLimiter()); } @@ -155,37 +125,29 @@ public class TestQuotaState { final int TABLE_A_THROTTLE_2 = 11; final int TABLE_B_THROTTLE = 4; final int TABLE_C_THROTTLE = 5; - final long LAST_UPDATE_1 = 10; - final long LAST_UPDATE_2 = 20; - final long LAST_UPDATE_3 = 30; UserQuotaState quotaInfo = new UserQuotaState(); - assertEquals(0, quotaInfo.getLastUpdate()); assertTrue(quotaInfo.isBypass()); // Add A B table limiters - UserQuotaState otherQuotaState = new UserQuotaState(LAST_UPDATE_1); + UserQuotaState otherQuotaState = new UserQuotaState(); otherQuotaState.setQuotas(tableNameA, buildReqNumThrottle(TABLE_A_THROTTLE_1)); otherQuotaState.setQuotas(tableNameB, buildReqNumThrottle(TABLE_B_THROTTLE)); - assertEquals(LAST_UPDATE_1, otherQuotaState.getLastUpdate()); assertFalse(otherQuotaState.isBypass()); quotaInfo.update(otherQuotaState); - assertEquals(LAST_UPDATE_1, quotaInfo.getLastUpdate()); assertFalse(quotaInfo.isBypass()); assertThrottleException(quotaInfo.getTableLimiter(tableNameA), TABLE_A_THROTTLE_1); assertThrottleException(quotaInfo.getTableLimiter(tableNameB), TABLE_B_THROTTLE); assertNoopLimiter(quotaInfo.getTableLimiter(tableNameC)); // Add C, Remove B, Update A table limiters - otherQuotaState = new UserQuotaState(LAST_UPDATE_2); + otherQuotaState = new UserQuotaState(); otherQuotaState.setQuotas(tableNameA, buildReqNumThrottle(TABLE_A_THROTTLE_2)); otherQuotaState.setQuotas(tableNameC, buildReqNumThrottle(TABLE_C_THROTTLE)); - assertEquals(LAST_UPDATE_2, otherQuotaState.getLastUpdate()); assertFalse(otherQuotaState.isBypass()); quotaInfo.update(otherQuotaState); - assertEquals(LAST_UPDATE_2, quotaInfo.getLastUpdate()); assertFalse(quotaInfo.isBypass()); assertThrottleException(quotaInfo.getTableLimiter(tableNameA), TABLE_A_THROTTLE_2 - TABLE_A_THROTTLE_1); @@ -193,12 +155,10 @@ public class TestQuotaState { assertNoopLimiter(quotaInfo.getTableLimiter(tableNameB)); // Remove table limiters - otherQuotaState = new UserQuotaState(LAST_UPDATE_3); - assertEquals(LAST_UPDATE_3, otherQuotaState.getLastUpdate()); + otherQuotaState = new UserQuotaState(); assertTrue(otherQuotaState.isBypass()); quotaInfo.update(otherQuotaState); - assertEquals(LAST_UPDATE_3, quotaInfo.getLastUpdate()); assertTrue(quotaInfo.isBypass()); assertNoopLimiter(quotaInfo.getTableLimiter(UNKNOWN_TABLE_NAME)); } @@ -207,20 +167,16 @@ public class TestQuotaState { public void testTableThrottleWithBatch() { final TableName TABLE_A = TableName.valueOf("TableA"); final int TABLE_A_THROTTLE_1 = 3; - final long LAST_UPDATE_1 = 10; UserQuotaState quotaInfo = new UserQuotaState(); - assertEquals(0, quotaInfo.getLastUpdate()); assertTrue(quotaInfo.isBypass()); // Add A table limiters - UserQuotaState otherQuotaState = new UserQuotaState(LAST_UPDATE_1); + UserQuotaState otherQuotaState = new UserQuotaState(); otherQuotaState.setQuotas(TABLE_A, buildReqNumThrottle(TABLE_A_THROTTLE_1)); - assertEquals(LAST_UPDATE_1, otherQuotaState.getLastUpdate()); assertFalse(otherQuotaState.isBypass()); quotaInfo.update(otherQuotaState); - assertEquals(LAST_UPDATE_1, quotaInfo.getLastUpdate()); assertFalse(quotaInfo.isBypass()); QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A); try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java index 5ae9de1fbf1..66996f36661 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java @@ -88,7 +88,6 @@ public class TestQuotaThrottle { TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); TEST_UTIL.startMiniCluster(1); TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); - QuotaCache.TEST_FORCE_REFRESH = true; tables = new Table[TABLE_NAMES.length]; for (int i = 0; i < TABLE_NAMES.length; ++i) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java index 683d189b761..7917f3c0847 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java @@ -65,7 +65,6 @@ public class TestQuotaUserOverride { CUSTOM_OVERRIDE_KEY); TEST_UTIL.startMiniCluster(NUM_SERVERS); TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); - QuotaCache.TEST_FORCE_REFRESH = true; TEST_UTIL.createTable(TABLE_NAME, FAMILY); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestThreadHandlerUsageQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestThreadHandlerUsageQuota.java index 8a9863132e8..58b15ec2429 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestThreadHandlerUsageQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestThreadHandlerUsageQuota.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.quotas; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -74,7 +75,6 @@ public class TestThreadHandlerUsageQuota { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); - QuotaCache.TEST_FORCE_REFRESH = true; TEST_UTIL.flush(TABLE_NAME); } @@ -104,11 +104,12 @@ public class TestThreadHandlerUsageQuota { } } - private void configureThrottle() throws IOException { + private void configureThrottle() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { admin.setQuota(QuotaSettingsFactory.throttleUser(getUserName(), - ThrottleType.REQUEST_HANDLER_USAGE_MS, 10000, TimeUnit.SECONDS)); + ThrottleType.REQUEST_HANDLER_USAGE_MS, 1, TimeUnit.SECONDS)); } + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); } private void unthrottleUser() throws Exception { @@ -116,6 +117,7 @@ public class TestThreadHandlerUsageQuota { admin.setQuota(QuotaSettingsFactory.unthrottleUserByThrottleType(getUserName(), ThrottleType.REQUEST_HANDLER_USAGE_MS)); } + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); } private static String getUserName() throws IOException {