http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java new file mode 100644 index 0000000..db45522 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Helper class to interact with the quota table + */ [email protected] [email protected] +public class QuotaUtil extends QuotaTableUtil { + private static final Log LOG = LogFactory.getLog(QuotaUtil.class); + + public static final String QUOTA_CONF_KEY = "hbase.quota.enabled"; + private static final boolean QUOTA_ENABLED_DEFAULT = false; + + /** Table descriptor for Quota internal table */ + public static final HTableDescriptor QUOTA_TABLE_DESC = new HTableDescriptor(QUOTA_TABLE_NAME); + static { + QUOTA_TABLE_DESC.addFamily(new HColumnDescriptor(QUOTA_FAMILY_INFO) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW) + .setMaxVersions(1)); + QUOTA_TABLE_DESC.addFamily(new HColumnDescriptor(QUOTA_FAMILY_USAGE) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW) + .setMaxVersions(1)); + } + + /** Returns true if the support for quota is enabled */ + public static boolean isQuotaEnabled(final Configuration conf) { + return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT); + } + + /* + * ========================================================================= Quota "settings" + * helpers + */ + public static void addTableQuota(final Connection connection, final TableName table, + final Quotas data) throws IOException { + addQuotas(connection, getTableRowKey(table), data); + } + + public static void deleteTableQuota(final Connection connection, final TableName table) + throws IOException { + deleteQuotas(connection, getTableRowKey(table)); + } + + public static void addNamespaceQuota(final Connection connection, final String namespace, + final Quotas data) throws IOException { + addQuotas(connection, getNamespaceRowKey(namespace), data); + } + + public static void deleteNamespaceQuota(final Connection connection, final String namespace) + throws IOException { + deleteQuotas(connection, getNamespaceRowKey(namespace)); + } + + public static void + addUserQuota(final Connection connection, final String user, final Quotas data) + throws IOException { + addQuotas(connection, getUserRowKey(user), data); + } + + public static void addUserQuota(final Connection connection, final String user, + final TableName table, final Quotas data) throws IOException { + addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data); + } + + public static void addUserQuota(final Connection connection, final String user, + final String namespace, final Quotas data) throws IOException { + addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace), + data); + } + + public static void deleteUserQuota(final Connection connection, final String user) + throws IOException { + deleteQuotas(connection, getUserRowKey(user)); + } + + public static void deleteUserQuota(final Connection connection, final String user, + final TableName table) throws IOException { + deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table)); + } + + public static void deleteUserQuota(final Connection connection, final String user, + final String namespace) throws IOException { + deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace)); + } + + private static void + addQuotas(final Connection connection, final byte[] rowKey, final Quotas data) + throws IOException { + addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data); + } + + private static void addQuotas(final Connection connection, final byte[] rowKey, + final byte[] qualifier, final Quotas data) throws IOException { + Put put = new Put(rowKey); + put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data)); + doPut(connection, put); + } + + private static void deleteQuotas(final Connection connection, final byte[] rowKey) + throws IOException { + deleteQuotas(connection, rowKey, null); + } + + private static void deleteQuotas(final Connection connection, final byte[] rowKey, + final byte[] qualifier) throws IOException { + Delete delete = new Delete(rowKey); + if (qualifier != null) { + delete.addColumns(QUOTA_FAMILY_INFO, qualifier); + } + doDelete(connection, delete); + } + + public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection, + final List<Get> gets) throws IOException { + long nowTs = EnvironmentEdgeManager.currentTime(); + Result[] results = doGet(connection, gets); + + Map<String, UserQuotaState> userQuotas = new HashMap<String, UserQuotaState>(results.length); + for (int i = 0; i < results.length; ++i) { + byte[] key = gets.get(i).getRow(); + assert isUserRowKey(key); + String user = getUserFromRowKey(key); + + final UserQuotaState quotaInfo = new UserQuotaState(nowTs); + userQuotas.put(user, quotaInfo); + + if (results[i].isEmpty()) continue; + assert Bytes.equals(key, results[i].getRow()); + + try { + parseUserResult(user, results[i], new UserQuotasVisitor() { + @Override + public void visitUserQuotas(String userName, String namespace, Quotas quotas) { + quotaInfo.setQuotas(namespace, quotas); + } + + @Override + public void visitUserQuotas(String userName, TableName table, Quotas quotas) { + quotaInfo.setQuotas(table, quotas); + } + + @Override + public void visitUserQuotas(String userName, Quotas quotas) { + quotaInfo.setQuotas(quotas); + } + }); + } catch (IOException e) { + LOG.error("Unable to parse user '" + user + "' quotas", e); + userQuotas.remove(user); + } + } + return userQuotas; + } + + public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection, + final List<Get> gets) throws IOException { + return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() { + @Override + public TableName getKeyFromRow(final byte[] row) { + assert isTableRowKey(row); + return getTableFromRowKey(row); + } + }); + } + + public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection, + final List<Get> gets) throws IOException { + return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() { + @Override + public String getKeyFromRow(final byte[] row) { + assert isNamespaceRowKey(row); + return getNamespaceFromRowKey(row); + } + }); + } + + public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, + final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr) + throws IOException { + long nowTs = EnvironmentEdgeManager.currentTime(); + Result[] results = doGet(connection, gets); + + Map<K, QuotaState> globalQuotas = new HashMap<K, QuotaState>(results.length); + for (int i = 0; i < results.length; ++i) { + byte[] row = gets.get(i).getRow(); + K key = kfr.getKeyFromRow(row); + + QuotaState quotaInfo = new QuotaState(nowTs); + globalQuotas.put(key, quotaInfo); + + if (results[i].isEmpty()) continue; + assert Bytes.equals(row, results[i].getRow()); + + byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); + if (data == null) continue; + + try { + Quotas quotas = quotasFromData(data); + quotaInfo.setQuotas(quotas); + } catch (IOException e) { + LOG.error("Unable to parse " + type + " '" + key + "' quotas", e); + globalQuotas.remove(key); + } + } + return globalQuotas; + } + + private static interface KeyFromRow<T> { + T getKeyFromRow(final byte[] row); + } + + /* + * ========================================================================= HTable helpers + */ + private static void doPut(final Connection connection, final Put put) throws IOException { + try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { + table.put(put); + } + } + + private static void doDelete(final Connection connection, final Delete delete) + throws IOException { + try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { + table.delete(delete); + } + } + + /* + * ========================================================================= Data Size Helpers + */ + public static long calculateMutationSize(final Mutation mutation) { + long size = 0; + for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) { + for (Cell cell : entry.getValue()) { + size += KeyValueUtil.length(cell); + } + } + return size; + } + + public static long calculateResultSize(final Result result) { + long size = 0; + for (Cell cell : result.rawCells()) { + size += KeyValueUtil.length(cell); + } + return size; + } + + public static long calculateResultSize(final List<Result> results) { + long size = 0; + for (Result result : results) { + for (Cell cell : result.rawCells()) { + size += KeyValueUtil.length(cell); + } + } + return size; + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java new file mode 100644 index 0000000..5b81269 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Simple rate limiter. Usage Example: RateLimiter limiter = new RateLimiter(); // At this point you + * have a unlimited resource limiter limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec long + * lastTs = 0; // You need to keep track of the last update timestamp while (true) { long now = + * System.currentTimeMillis(); // call canExecute before performing resource consuming operation + * bool canExecute = limiter.canExecute(now, lastTs); // If there are no available resources, wait + * until one is available if (!canExecute) Thread.sleep(limiter.waitInterval()); // ...execute the + * work and consume the resource... limiter.consume(); } + */ [email protected] [email protected] +public class RateLimiter { + private long tunit = 1000; // Timeunit factor for translating to ms. + private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to. + private long avail = Long.MAX_VALUE; // Currently available resource units + + public RateLimiter() { + } + + /** + * Set the RateLimiter max available resources and refill period. + * @param limit The max value available resource units can be refilled to. + * @param timeUnit Timeunit factor for translating to ms. + */ + public synchronized void set(final long limit, final TimeUnit timeUnit) { + switch (timeUnit) { + case NANOSECONDS: + throw new RuntimeException("Unsupported NANOSECONDS TimeUnit"); + case MICROSECONDS: + throw new RuntimeException("Unsupported MICROSECONDS TimeUnit"); + case MILLISECONDS: + tunit = 1; + break; + case SECONDS: + tunit = 1000; + break; + case MINUTES: + tunit = 60 * 1000; + break; + case HOURS: + tunit = 60 * 60 * 1000; + break; + case DAYS: + tunit = 24 * 60 * 60 * 1000; + break; + default: + throw new RuntimeException("Invalid TimeUnit " + timeUnit); + } + this.limit = limit; + this.avail = limit; + } + + public String toString() { + if (limit == Long.MAX_VALUE) { + return "RateLimiter(Bypass)"; + } + return "RateLimiter(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")"; + } + + /** + * Sets the current instance of RateLimiter to a new values. if current limit is smaller than the + * new limit, bump up the available resources. Otherwise allow clients to use up the previously + * available resources. + */ + public synchronized void update(final RateLimiter other) { + this.tunit = other.tunit; + if (this.limit < other.limit) { + this.avail += (other.limit - this.limit); + } + this.limit = other.limit; + } + + public synchronized boolean isBypass() { + return limit == Long.MAX_VALUE; + } + + public synchronized long getLimit() { + return limit; + } + + public synchronized long getAvailable() { + return avail; + } + + /** + * given the time interval, is there at least one resource available to allow execution? + * @param now the current timestamp + * @param lastTs the timestamp of the last update + * @return true if there is at least one resource available, otherwise false + */ + public boolean canExecute(final long now, final long lastTs) { + return canExecute(now, lastTs, 1); + } + + /** + * given the time interval, are there enough available resources to allow execution? + * @param now the current timestamp + * @param lastTs the timestamp of the last update + * @param amount the number of required resources + * @return true if there are enough available resources, otherwise false + */ + public synchronized boolean canExecute(final long now, final long lastTs, final long amount) { + return avail >= amount ? true : refill(now, lastTs) >= amount; + } + + /** + * consume one available unit. + */ + public void consume() { + consume(1); + } + + /** + * consume amount available units. + * @param amount the number of units to consume + */ + public synchronized void consume(final long amount) { + this.avail -= amount; + } + + /** + * @return estimate of the ms required to wait before being able to provide 1 resource. + */ + public long waitInterval() { + return waitInterval(1); + } + + /** + * @return estimate of the ms required to wait before being able to provide "amount" resources. + */ + public synchronized long waitInterval(final long amount) { + // TODO Handle over quota? + return (amount <= avail) ? 0 : ((amount * tunit) / limit) - ((avail * tunit) / limit); + } + + /** + * given the specified time interval, refill the avilable units to the proportionate to elapsed + * time or to the prespecified limit. + */ + private long refill(final long now, final long lastTs) { + long delta = (limit * (now - lastTs)) / tunit; + if (delta > 0) { + avail = Math.min(limit, avail + delta); + } + return avail; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java new file mode 100644 index 0000000..71b452a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Region Server Quota Manager. It is responsible to provide access to the quota information of each + * user/table. The direct user of this class is the RegionServer that will get and check the + * user/table quota for each operation (put, get, scan). For system tables and user/table with a + * quota specified, the quota check will be a noop. + */ [email protected] [email protected] +public class RegionServerQuotaManager { + private static final Log LOG = LogFactory.getLog(RegionServerQuotaManager.class); + + private final RegionServerServices rsServices; + + private QuotaCache quotaCache = null; + + public RegionServerQuotaManager(final RegionServerServices rsServices) { + this.rsServices = rsServices; + } + + public void start(final RpcScheduler rpcScheduler) throws IOException { + if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { + LOG.info("Quota support disabled"); + return; + } + + LOG.info("Initializing quota support"); + + // Initialize quota cache + quotaCache = new QuotaCache(rsServices); + quotaCache.start(); + } + + public void stop() { + if (isQuotaEnabled()) { + quotaCache.stop("shutdown"); + } + } + + public boolean isQuotaEnabled() { + return quotaCache != null; + } + + @VisibleForTesting + QuotaCache getQuotaCache() { + return quotaCache; + } + + /** + * Returns the quota for an operation. + * @param ugi the user that is executing the operation + * @param table the table where the operation will be executed + * @return the OperationQuota + */ + public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) { + if (isQuotaEnabled() && !table.isSystemTable()) { + UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); + QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); + boolean useNoop = userLimiter.isBypass(); + if (userQuotaState.hasBypassGlobals()) { + if (LOG.isTraceEnabled()) { + LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); + } + if (!useNoop) { + return new DefaultOperationQuota(userLimiter); + } + } else { + QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); + QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table); + useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass(); + if (LOG.isTraceEnabled()) { + LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter + + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter); + } + if (!useNoop) { + return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter); + } + } + } + return NoopOperationQuota.get(); + } + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. + * @param region the region where the operation will be performed + * @param type the operation type + * @return the OperationQuota + * @throws ThrottlingException if the operation cannot be executed due to quota exceeded. + */ + public OperationQuota checkQuota(final Region region, final OperationQuota.OperationType type) + throws IOException, ThrottlingException { + switch (type) { + case SCAN: + return checkQuota(region, 0, 0, 1); + case GET: + return checkQuota(region, 0, 1, 0); + case MUTATE: + return checkQuota(region, 1, 0, 0); + default: + throw new RuntimeException("Invalid operation type: " + type); + } + } + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. + * @param region the region where the operation will be performed + * @param actions the "multi" actions to perform + * @return the OperationQuota + * @throws ThrottlingException if the operation cannot be executed due to quota exceeded. + */ + public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions) + throws IOException, ThrottlingException { + int numWrites = 0; + int numReads = 0; + for (final ClientProtos.Action action : actions) { + if (action.hasMutation()) { + numWrites++; + } else if (action.hasGet()) { + numReads++; + } + } + return checkQuota(region, numWrites, numReads, 0); + } + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. + * @param region the region where the operation will be performed + * @param numWrites number of writes to perform + * @param numReads number of short-reads to perform + * @param numScans number of scan to perform + * @return the OperationQuota + * @throws ThrottlingException if the operation cannot be executed due to quota exceeded. + */ + private OperationQuota checkQuota(final Region region, final int numWrites, final int numReads, + final int numScans) throws IOException, ThrottlingException { + User user = RpcServer.getRequestUser(); + UserGroupInformation ugi; + if (user != null) { + ugi = user.getUGI(); + } else { + ugi = User.getCurrent().getUGI(); + } + TableName table = region.getTableDesc().getTableName(); + + OperationQuota quota = getQuota(ugi, table); + try { + quota.checkQuota(numWrites, numReads, numScans); + } catch (ThrottlingException e) { + LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + + " numWrites=" + numWrites + " numReads=" + numReads + " numScans=" + numScans + ": " + + e.getMessage()); + throw e; + } + return quota; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java new file mode 100644 index 0000000..4e31f82 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota; +import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize; +import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Simple time based limiter that checks the quota Throttle + */ [email protected] [email protected] +public final class TimeBasedLimiter implements QuotaLimiter { + private long writeLastTs = 0; + private long readLastTs = 0; + + private RateLimiter reqsLimiter = new RateLimiter(); + private RateLimiter reqSizeLimiter = new RateLimiter(); + private RateLimiter writeReqsLimiter = new RateLimiter(); + private RateLimiter writeSizeLimiter = new RateLimiter(); + private RateLimiter readReqsLimiter = new RateLimiter(); + private RateLimiter readSizeLimiter = new RateLimiter(); + private AvgOperationSize avgOpSize = new AvgOperationSize(); + + private TimeBasedLimiter() { + } + + static QuotaLimiter fromThrottle(final Throttle throttle) { + TimeBasedLimiter limiter = new TimeBasedLimiter(); + boolean isBypass = true; + if (throttle.hasReqNum()) { + setFromTimedQuota(limiter.reqsLimiter, throttle.getReqNum()); + isBypass = false; + } + + if (throttle.hasReqSize()) { + setFromTimedQuota(limiter.reqSizeLimiter, throttle.getReqSize()); + isBypass = false; + } + + if (throttle.hasWriteNum()) { + setFromTimedQuota(limiter.writeReqsLimiter, throttle.getWriteNum()); + isBypass = false; + } + + if (throttle.hasWriteSize()) { + setFromTimedQuota(limiter.writeSizeLimiter, throttle.getWriteSize()); + isBypass = false; + } + + if (throttle.hasReadNum()) { + setFromTimedQuota(limiter.readReqsLimiter, throttle.getReadNum()); + isBypass = false; + } + + if (throttle.hasReadSize()) { + setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize()); + isBypass = false; + } + return isBypass ? NoopQuotaLimiter.get() : limiter; + } + + public void update(final TimeBasedLimiter other) { + reqsLimiter.update(other.reqsLimiter); + reqSizeLimiter.update(other.reqSizeLimiter); + writeReqsLimiter.update(other.writeReqsLimiter); + writeSizeLimiter.update(other.writeSizeLimiter); + readReqsLimiter.update(other.readReqsLimiter); + readSizeLimiter.update(other.readSizeLimiter); + } + + private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) { + limiter.set(timedQuota.getSoftLimit(), ProtobufUtil.toTimeUnit(timedQuota.getTimeUnit())); + } + + @Override + public void checkQuota(long writeSize, long readSize) throws ThrottlingException { + long now = EnvironmentEdgeManager.currentTime(); + long lastTs = Math.max(readLastTs, writeLastTs); + + if (!reqsLimiter.canExecute(now, lastTs)) { + ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); + } + if (!reqSizeLimiter.canExecute(now, lastTs, writeSize + readSize)) { + ThrottlingException.throwNumRequestsExceeded(reqSizeLimiter + .waitInterval(writeSize + readSize)); + } + + if (writeSize > 0) { + if (!writeReqsLimiter.canExecute(now, writeLastTs)) { + ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval()); + } + if (!writeSizeLimiter.canExecute(now, writeLastTs, writeSize)) { + ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize)); + } + } + + if (readSize > 0) { + if (!readReqsLimiter.canExecute(now, readLastTs)) { + ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval()); + } + if (!readSizeLimiter.canExecute(now, readLastTs, readSize)) { + ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize)); + } + } + } + + @Override + public void grabQuota(long writeSize, long readSize) { + assert writeSize != 0 || readSize != 0; + + long now = EnvironmentEdgeManager.currentTime(); + + reqsLimiter.consume(1); + reqSizeLimiter.consume(writeSize + readSize); + + if (writeSize > 0) { + writeReqsLimiter.consume(1); + writeSizeLimiter.consume(writeSize); + writeLastTs = now; + } + if (readSize > 0) { + readReqsLimiter.consume(1); + readSizeLimiter.consume(readSize); + readLastTs = now; + } + } + + @Override + public void consumeWrite(final long size) { + reqSizeLimiter.consume(size); + writeSizeLimiter.consume(size); + } + + @Override + public void consumeRead(final long size) { + reqSizeLimiter.consume(size); + readSizeLimiter.consume(size); + } + + @Override + public boolean isBypass() { + return false; + } + + @Override + public long getWriteAvailable() { + return writeSizeLimiter.getAvailable(); + } + + @Override + public long getReadAvailable() { + return readSizeLimiter.getAvailable(); + } + + @Override + public void addOperationSize(OperationType type, long size) { + avgOpSize.addOperationSize(type, size); + } + + @Override + public long getAvgOperationSize(OperationType type) { + return avgOpSize.getAvgOperationSize(type); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("TimeBasedLimiter("); + if (!reqsLimiter.isBypass()) builder.append("reqs=" + reqsLimiter); + if (!reqSizeLimiter.isBypass()) builder.append(" resSize=" + reqSizeLimiter); + if (!writeReqsLimiter.isBypass()) builder.append(" writeReqs=" + writeReqsLimiter); + if (!writeSizeLimiter.isBypass()) builder.append(" writeSize=" + writeSizeLimiter); + if (!readReqsLimiter.isBypass()) builder.append(" readReqs=" + readReqsLimiter); + if (!readSizeLimiter.isBypass()) builder.append(" readSize=" + readSizeLimiter); + builder.append(')'); + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java new file mode 100644 index 0000000..eb201cb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * In-Memory state of the user quotas + */ [email protected] [email protected] +public class UserQuotaState extends QuotaState { + private Map<String, QuotaLimiter> namespaceLimiters = null; + private Map<TableName, QuotaLimiter> tableLimiters = null; + private boolean bypassGlobals = false; + + public UserQuotaState() { + super(); + } + + public UserQuotaState(final long updateTs) { + super(updateTs); + } + + @Override + public synchronized String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("UserQuotaState(ts=" + getLastUpdate()); + if (bypassGlobals) builder.append(" bypass-globals"); + + if (isBypass()) { + builder.append(" bypass"); + } else { + if (getGlobalLimiterWithoutUpdatingLastQuery() != NoopQuotaLimiter.get()) { + builder.append(" global-limiter"); + } + + if (tableLimiters != null && !tableLimiters.isEmpty()) { + builder.append(" ["); + for (TableName table : tableLimiters.keySet()) { + builder.append(" " + table); + } + builder.append(" ]"); + } + + if (namespaceLimiters != null && !namespaceLimiters.isEmpty()) { + builder.append(" ["); + for (String ns : namespaceLimiters.keySet()) { + builder.append(" " + ns); + } + builder.append(" ]"); + } + } + builder.append(')'); + return builder.toString(); + } + + /** + * @return true if there is no quota information associated to this object + */ + @Override + public synchronized boolean isBypass() { + return !bypassGlobals && getGlobalLimiterWithoutUpdatingLastQuery() == NoopQuotaLimiter.get() + && (tableLimiters == null || tableLimiters.isEmpty()) + && (namespaceLimiters == null || namespaceLimiters.isEmpty()); + } + + public synchronized boolean hasBypassGlobals() { + return bypassGlobals; + } + + @Override + public synchronized void setQuotas(final Quotas quotas) { + super.setQuotas(quotas); + bypassGlobals = quotas.getBypassGlobals(); + } + + /** + * Add the quota information of the specified table. (This operation is part of the QuotaState + * setup) + */ + public synchronized void setQuotas(final TableName table, Quotas quotas) { + tableLimiters = setLimiter(tableLimiters, table, quotas); + } + + /** + * Add the quota information of the specified namespace. (This operation is part of the QuotaState + * setup) + */ + public synchronized void setQuotas(final String namespace, Quotas quotas) { + namespaceLimiters = setLimiter(namespaceLimiters, namespace, quotas); + } + + private <K> Map<K, QuotaLimiter> setLimiter(Map<K, QuotaLimiter> limiters, final K key, + final Quotas quotas) { + if (limiters == null) { + limiters = new HashMap<K, QuotaLimiter>(); + } + + QuotaLimiter limiter = + quotas.hasThrottle() ? QuotaLimiterFactory.fromThrottle(quotas.getThrottle()) : null; + if (limiter != null && !limiter.isBypass()) { + limiters.put(key, limiter); + } else { + limiters.remove(key); + } + return limiters; + } + + /** + * Perform an update of the quota state based on the other quota state object. (This operation is + * executed by the QuotaCache) + */ + @Override + public synchronized void update(final QuotaState other) { + super.update(other); + + if (other instanceof UserQuotaState) { + UserQuotaState uOther = (UserQuotaState) other; + tableLimiters = updateLimiters(tableLimiters, uOther.tableLimiters); + namespaceLimiters = updateLimiters(namespaceLimiters, uOther.namespaceLimiters); + bypassGlobals = uOther.bypassGlobals; + } else { + tableLimiters = null; + namespaceLimiters = null; + bypassGlobals = false; + } + } + + private static <K> Map<K, QuotaLimiter> updateLimiters(final Map<K, QuotaLimiter> map, + final Map<K, QuotaLimiter> otherMap) { + if (map == null) { + return otherMap; + } + + if (otherMap != null) { + // To Remove + Set<K> toRemove = new HashSet<K>(map.keySet()); + toRemove.removeAll(otherMap.keySet()); + map.keySet().removeAll(toRemove); + + // To Update/Add + for (final Map.Entry<K, QuotaLimiter> entry : otherMap.entrySet()) { + QuotaLimiter limiter = map.get(entry.getKey()); + if (limiter == null) { + limiter = entry.getValue(); + } else { + limiter = QuotaLimiterFactory.update(limiter, entry.getValue()); + } + map.put(entry.getKey(), limiter); + } + return map; + } + return null; + } + + /** + * Return the limiter for the specified table associated with this quota. If the table does not + * have its own quota limiter the global one will be returned. In case there is no quota limiter + * associated with this object a noop limiter will be returned. + * @return the quota limiter for the specified table + */ + public synchronized QuotaLimiter getTableLimiter(final TableName table) { + setLastQuery(EnvironmentEdgeManager.currentTime()); + if (tableLimiters != null) { + QuotaLimiter limiter = tableLimiters.get(table); + if (limiter != null) return limiter; + } + if (namespaceLimiters != null) { + QuotaLimiter limiter = namespaceLimiters.get(table.getNamespaceAsString()); + if (limiter != null) return limiter; + } + return getGlobalLimiterWithoutUpdatingLastQuery(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index af457ec..b7053cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -130,6 +130,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; +import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; @@ -431,6 +432,8 @@ public class HRegionServer extends HasThread implements private RegionServerCoprocessorHost rsHost; private RegionServerProcedureManagerHost rspmHost; + + private RegionServerQuotaManager rsQuotaManager; // Table level lock manager for locking for region operations protected TableLockManager tableLockManager; @@ -825,6 +828,9 @@ public class HRegionServer extends HasThread implements nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); } + // Setup the Quota Manager + rsQuotaManager = new RegionServerQuotaManager(this); + // Setup RPC client for master communication rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( rpcServices.isa.getAddress(), 0)); @@ -891,6 +897,9 @@ public class HRegionServer extends HasThread implements // since the server is ready to run rspmHost.start(); } + + // Start the Quota Manager + rsQuotaManager.start(getRpcServer().getScheduler()); // We registered with the Master. Go into run mode. long lastMsg = System.currentTimeMillis(); @@ -976,6 +985,11 @@ public class HRegionServer extends HasThread implements if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true); if (this.storefileRefresher != null) this.storefileRefresher.cancel(true); + // Stop the quota manager + if (rsQuotaManager != null) { + rsQuotaManager.stop(); + } + // Stop the snapshot and other procedure handlers, forcefully killing all running tasks if (rspmHost != null) { rspmHost.stop(this.abortRequested || this.killed); @@ -2486,6 +2500,11 @@ public class HRegionServer extends HasThread implements public ChoreService getChoreService() { return choreService; } + + @Override + public RegionServerQuotaManager getRegionServerQuotaManager() { + return rsQuotaManager; + } // // Main program and support routines @@ -2604,6 +2623,22 @@ public class HRegionServer extends HasThread implements } return tableRegions; } + + /** + * Gets the online tables in this RS. + * This method looks at the in-memory onlineRegions. + * @return all the online tables in this RS + */ + @Override + public Set<TableName> getOnlineTables() { + Set<TableName> tables = new HashSet<TableName>(); + synchronized (this.onlineRegions) { + for (Region region: this.onlineRegions.values()) { + tables.add(region.getTableDesc().getTableName()); + } + } + return tables; + } // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). public String[] getRegionServerCoprocessors() { http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 9b47c75..6dbf684 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -152,6 +152,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; +import org.apache.hadoop.hbase.quotas.OperationQuota; +import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Region.FlushResult; @@ -448,10 +450,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * bypassed as indicated by RegionObserver, null otherwise * @throws IOException */ - private Result append(final Region region, final MutationProto m, + private Result append(final Region region, final OperationQuota quota, final MutationProto m, final CellScanner cellScanner, long nonceGroup) throws IOException { long before = EnvironmentEdgeManager.currentTime(); Append append = ProtobufUtil.toAppend(m, cellScanner); + quota.addMutation(append); Result r = null; if (region.getCoprocessorHost() != null) { r = region.getCoprocessorHost().preAppend(append); @@ -484,10 +487,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @return the Result * @throws IOException */ - private Result increment(final Region region, final MutationProto mutation, - final CellScanner cells, long nonceGroup) throws IOException { + private Result increment(final Region region, final OperationQuota quota, + final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException { long before = EnvironmentEdgeManager.currentTime(); Increment increment = ProtobufUtil.toIncrement(mutation, cells); + quota.addMutation(increment); Result r = null; if (region.getCoprocessorHost() != null) { r = region.getCoprocessorHost().preIncrement(increment); @@ -524,7 +528,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @return Return the <code>cellScanner</code> passed */ private List<CellScannable> doNonAtomicRegionMutation(final Region region, - final RegionAction actions, final CellScanner cellScanner, + final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) { // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do // one at a time, we instead pass them in batch. Be aware that the corresponding @@ -557,15 +561,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && !mutations.isEmpty()) { // Flush out any Puts or Deletes already collected. - doBatchOp(builder, region, mutations, cellScanner); + doBatchOp(builder, region, quota, mutations, cellScanner); mutations.clear(); } switch (type) { case APPEND: - r = append(region, action.getMutation(), cellScanner, nonceGroup); + r = append(region, quota, action.getMutation(), cellScanner, nonceGroup); break; case INCREMENT: - r = increment(region, action.getMutation(), cellScanner, nonceGroup); + r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup); break; case PUT: case DELETE: @@ -610,7 +614,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // Finish up any outstanding mutations if (mutations != null && !mutations.isEmpty()) { - doBatchOp(builder, region, mutations, cellScanner); + doBatchOp(builder, region, quota, mutations, cellScanner); } return cellsToReturn; } @@ -623,6 +627,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param mutations */ private void doBatchOp(final RegionActionResult.Builder builder, final Region region, + final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells) { Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTime(); @@ -640,6 +645,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, batchContainsDelete = true; } mArray[i++] = mutation; + quota.addMutation(mutation); } if (!region.getRegionInfo().isMetaTable()) { @@ -893,6 +899,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Configuration getConfiguration() { return regionServer.getConfiguration(); } + + private RegionServerQuotaManager getQuotaManager() { + return regionServer.getRegionServerQuotaManager(); + } void start() { rpcServer.start(); @@ -1813,6 +1823,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, public GetResponse get(final RpcController controller, final GetRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTime(); + OperationQuota quota = null; try { checkOpen(); requestCount.increment(); @@ -1822,6 +1833,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ClientProtos.Get get = request.getGet(); Boolean existence = null; Result r = null; + quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { if (get.getColumnCount() != 1) { @@ -1856,6 +1868,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ClientProtos.Result pbr = ProtobufUtil.toResult(r); builder.setResult(pbr); } + if (r != null) { + quota.addGetResult(r); + } return builder.build(); } catch (IOException ie) { throw new ServiceException(ie); @@ -1864,6 +1879,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionServer.metricsRegionServer.updateGet( EnvironmentEdgeManager.currentTime() - before); } + if (quota != null) { + quota.close(); + } } } @@ -1899,10 +1917,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, for (RegionAction regionAction : request.getRegionActionList()) { this.requestCount.add(regionAction.getActionCount()); + OperationQuota quota; Region region; regionActionResultBuilder.clear(); try { region = getRegion(regionAction.getRegion()); + quota = getQuotaManager().checkQuota(region, regionAction.getActionList()); } catch (IOException e) { regionActionResultBuilder.setException(ResponseConverter.buildException(e)); responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); @@ -1939,10 +1959,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } else { // doNonAtomicRegionMutation manages the exception internally - cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner, + cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, regionActionResultBuilder, cellsToReturn, nonceGroup); } responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); + quota.close(); } // Load the controller with the Cells to return. if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { @@ -1966,6 +1987,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // It is also the conduit via which we pass back data. PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; CellScanner cellScanner = controller != null? controller.cellScanner(): null; + OperationQuota quota = null; // Clear scanner so we are not holding on to reference across call. if (controller != null) controller.setCellScanner(null); try { @@ -1981,17 +2003,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Result r = null; Boolean processed = null; MutationType type = mutation.getMutateType(); + long mutationSize = 0; + quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); switch (type) { case APPEND: // TODO: this doesn't actually check anything. - r = append(region, mutation, cellScanner, nonceGroup); + r = append(region, quota, mutation, cellScanner, nonceGroup); break; case INCREMENT: // TODO: this doesn't actually check anything. - r = increment(region, mutation, cellScanner, nonceGroup); + r = increment(region, quota, mutation, cellScanner, nonceGroup); break; case PUT: Put put = ProtobufUtil.toPut(mutation, cellScanner); + quota.addMutation(put); if (request.hasCondition()) { Condition condition = request.getCondition(); byte[] row = condition.getRow().toByteArray(); @@ -2020,6 +2045,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, break; case DELETE: Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); + quota.addMutation(delete); if (request.hasCondition()) { Condition condition = request.getCondition(); byte[] row = condition.getRow().toByteArray(); @@ -2056,6 +2082,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } catch (IOException ie) { regionServer.checkFileSystem(); throw new ServiceException(ie); + } finally { + if (quota != null) { + quota.close(); + } } } @@ -2069,6 +2099,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, @Override public ScanResponse scan(final RpcController controller, final ScanRequest request) throws ServiceException { + OperationQuota quota = null; Leases.Lease lease = null; String scannerName = null; try { @@ -2162,6 +2193,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ttl = this.scannerLeaseTimeoutPeriod; } + quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); + long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); if (rows > 0) { // if nextCallSeq does not match throw Exception straight away. This needs to be // performed even before checking of Lease. @@ -2207,9 +2240,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } if (!done) { - long maxResultSize = scanner.getMaxResultSize(); + long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); if (maxResultSize <= 0) { - maxResultSize = maxScannerResultSize; + maxResultSize = maxQuotaResultSize; } List<Cell> values = new ArrayList<Cell>(); region.startRegionOperation(Operation.SCAN); @@ -2302,6 +2335,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); } } + + quota.addScanResult(results); // If the scanner's filter - if any - is done with the scan // and wants to tell the client to stop the scan. This is done by passing @@ -2362,6 +2397,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } throw new ServiceException(ie); + } finally { + if (quota != null) { + quota.close(); + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index aba09ba..de99451 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.executor.ExecutorService; @@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.zookeeper.KeeperException; import com.google.protobuf.Service; @@ -70,6 +73,11 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi * @return RegionServer's instance of {@link TableLockManager} */ TableLockManager getTableLockManager(); + + /** + * @return RegionServer's instance of {@link RegionServerQuotaManager} + */ + RegionServerQuotaManager getRegionServerQuotaManager(); /** * Tasks to perform after region open to complete deploy of region on @@ -148,4 +156,9 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure() */ double getCompactionPressure(); + + /** + * @return all the online tables in this RS + */ + Set<TableName> getOnlineTables(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index e0f6be1..ae57738 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest; import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -2464,4 +2465,34 @@ public class AccessController extends BaseMasterAndRegionObserver public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx, List<WALEntry> entries, CellScanner cells) throws IOException { } + + @Override + public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final Quotas quotas) throws IOException { + requirePermission("setUserQuota", Action.ADMIN); + } + + @Override + public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final TableName tableName, final Quotas quotas) throws IOException { + requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN); + } + + @Override + public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final String namespace, final Quotas quotas) throws IOException { + requirePermission("setUserNamespaceQuota", Action.ADMIN); + } + + @Override + public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final TableName tableName, final Quotas quotas) throws IOException { + requirePermission("setTableQuota", tableName, null, null, Action.ADMIN); + } + + @Override + public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String namespace, final Quotas quotas) throws IOException { + requirePermission("setNamespaceQuota", Action.ADMIN); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java index 8d1664b..4a93151 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java @@ -90,6 +90,7 @@ public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements public E poll() { E elem = objects[head]; + objects[head] = null; head = (head + 1) % objects.length; if (head == 0) tail = 0; return elem; http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 6a1539a..d6112db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; @@ -100,6 +102,11 @@ public class MockRegionServerServices implements RegionServerServices { public List<Region> getOnlineRegions(TableName tableName) throws IOException { return null; } + + @Override + public Set<TableName> getOnlineTables() { + return null; + } @Override public void addToOnlineRegions(Region r) { @@ -168,6 +175,11 @@ public class MockRegionServerServices implements RegionServerServices { public TableLockManager getTableLockManager() { return new NullTableLockManager(); } + + @Override + public RegionServerQuotaManager getRegionServerQuotaManager() { + return null; + } @Override public ServerName getServerName() { http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java index c85ba83..ba5ca2c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -1092,6 +1093,56 @@ public class TestMasterObserver { public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName) throws IOException { } + + @Override + public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final Quotas quotas) throws IOException { + } + + @Override + public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final Quotas quotas) throws IOException { + } + + @Override + public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final TableName tableName, final Quotas quotas) throws IOException { + } + + @Override + public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final TableName tableName, final Quotas quotas) throws IOException { + } + + @Override + public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final String namespace, final Quotas quotas) throws IOException { + } + + @Override + public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final String namespace, final Quotas quotas) throws IOException { + } + + @Override + public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final TableName tableName, final Quotas quotas) throws IOException { + } + + @Override + public void postSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final TableName tableName, final Quotas quotas) throws IOException { + } + + @Override + public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String namespace, final Quotas quotas) throws IOException { + } + + @Override + public void postSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String namespace, final Quotas quotas) throws IOException { + } } private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 2aa64e9..96c12aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -92,6 +93,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -326,6 +328,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { public TableLockManager getTableLockManager() { return new NullTableLockManager(); } + + @Override + public RegionServerQuotaManager getRegionServerQuotaManager() { + return null; + } @Override public void postOpenDeployTasks(Region r) throws KeeperException, IOException { @@ -528,6 +535,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { // TODO Auto-generated method stub return null; } + + @Override + public Set<TableName> getOnlineTables() { + return null; + } @Override public Leases getLeases() { http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index b88c747..4e6b01f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; +import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -250,6 +251,11 @@ public class TestCatalogJanitor { public MasterCoprocessorHost getMasterCoprocessorHost() { return null; } + + @Override + public MasterQuotaManager getMasterQuotaManager() { + return null; + } @Override public ServerManager getServerManager() { http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java new file mode 100644 index 0000000..294f643 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * minicluster tests that validate that quota entries are properly set in the quota table + */ +@Category({ MediumTests.class }) +public class TestQuotaAdmin { + final Log LOG = LogFactory.getLog(getClass()); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 2000); + TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testSimpleScan() throws Exception { + Admin admin = TEST_UTIL.getHBaseAdmin(); + String userName = User.getCurrent().getShortName(); + + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_NUMBER, 6, + TimeUnit.MINUTES)); + admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, true)); + + QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration()); + try { + int countThrottle = 0; + int countGlobalBypass = 0; + for (QuotaSettings settings : scanner) { + LOG.debug(settings); + switch (settings.getQuotaType()) { + case THROTTLE: + ThrottleSettings throttle = (ThrottleSettings) settings; + assertEquals(userName, throttle.getUserName()); + assertEquals(null, throttle.getTableName()); + assertEquals(null, throttle.getNamespace()); + assertEquals(6, throttle.getSoftLimit()); + assertEquals(TimeUnit.MINUTES, throttle.getTimeUnit()); + countThrottle++; + break; + case GLOBAL_BYPASS: + countGlobalBypass++; + break; + default: + fail("unexpected settings type: " + settings.getQuotaType()); + } + } + assertEquals(1, countThrottle); + assertEquals(1, countGlobalBypass); + } finally { + scanner.close(); + } + + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + assertNumResults(1, null); + admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, false)); + assertNumResults(0, null); + } + + @Test + public void testQuotaRetrieverFilter() throws Exception { + Admin admin = TEST_UTIL.getHBaseAdmin(); + TableName[] tables = + new TableName[] { TableName.valueOf("T0"), TableName.valueOf("T01"), + TableName.valueOf("NS0:T2"), }; + String[] namespaces = new String[] { "NS0", "NS01", "NS2" }; + String[] users = new String[] { "User0", "User01", "User2" }; + + for (String user : users) { + admin.setQuota(QuotaSettingsFactory.throttleUser(user, ThrottleType.REQUEST_NUMBER, 1, + TimeUnit.MINUTES)); + + for (TableName table : tables) { + admin.setQuota(QuotaSettingsFactory.throttleUser(user, table, ThrottleType.REQUEST_NUMBER, + 2, TimeUnit.MINUTES)); + } + + for (String ns : namespaces) { + admin.setQuota(QuotaSettingsFactory.throttleUser(user, ns, ThrottleType.REQUEST_NUMBER, 3, + TimeUnit.MINUTES)); + } + } + assertNumResults(21, null); + + for (TableName table : tables) { + admin.setQuota(QuotaSettingsFactory.throttleTable(table, ThrottleType.REQUEST_NUMBER, 4, + TimeUnit.MINUTES)); + } + assertNumResults(24, null); + + for (String ns : namespaces) { + admin.setQuota(QuotaSettingsFactory.throttleNamespace(ns, ThrottleType.REQUEST_NUMBER, 5, + TimeUnit.MINUTES)); + } + assertNumResults(27, null); + + assertNumResults(7, new QuotaFilter().setUserFilter("User0")); + assertNumResults(0, new QuotaFilter().setUserFilter("User")); + assertNumResults(21, new QuotaFilter().setUserFilter("User.*")); + assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setTableFilter("T0")); + assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setTableFilter("NS.*")); + assertNumResults(0, new QuotaFilter().setUserFilter("User.*").setTableFilter("T")); + assertNumResults(6, new QuotaFilter().setUserFilter("User.*").setTableFilter("T.*")); + assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS0")); + assertNumResults(0, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS")); + assertNumResults(9, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS.*")); + assertNumResults(6, new QuotaFilter().setUserFilter("User.*").setTableFilter("T0") + .setNamespaceFilter("NS0")); + assertNumResults(1, new QuotaFilter().setTableFilter("T0")); + assertNumResults(0, new QuotaFilter().setTableFilter("T")); + assertNumResults(2, new QuotaFilter().setTableFilter("T.*")); + assertNumResults(3, new QuotaFilter().setTableFilter(".*T.*")); + assertNumResults(1, new QuotaFilter().setNamespaceFilter("NS0")); + assertNumResults(0, new QuotaFilter().setNamespaceFilter("NS")); + assertNumResults(3, new QuotaFilter().setNamespaceFilter("NS.*")); + + for (String user : users) { + admin.setQuota(QuotaSettingsFactory.unthrottleUser(user)); + for (TableName table : tables) { + admin.setQuota(QuotaSettingsFactory.unthrottleUser(user, table)); + } + for (String ns : namespaces) { + admin.setQuota(QuotaSettingsFactory.unthrottleUser(user, ns)); + } + } + assertNumResults(6, null); + + for (TableName table : tables) { + admin.setQuota(QuotaSettingsFactory.unthrottleTable(table)); + } + assertNumResults(3, null); + + for (String ns : namespaces) { + admin.setQuota(QuotaSettingsFactory.unthrottleNamespace(ns)); + } + assertNumResults(0, null); + } + + private void assertNumResults(int expected, final QuotaFilter filter) throws Exception { + assertEquals(expected, countResults(filter)); + } + + private int countResults(final QuotaFilter filter) throws Exception { + QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration(), filter); + try { + int count = 0; + for (QuotaSettings settings : scanner) { + LOG.debug(settings); + count++; + } + return count; + } finally { + scanner.close(); + } + } +} \ No newline at end of file
