HBASE-13205 [branch-1] Backport HBASE-11598 Add simple rpc throttling (Ashish Singhi)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c031d8de Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c031d8de Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c031d8de Branch: refs/heads/branch-1 Commit: c031d8de2391b1349de05c8d439224dab1bde978 Parents: bbdd50b Author: tedyu <[email protected]> Authored: Tue Apr 7 18:29:03 2015 -0700 Committer: tedyu <[email protected]> Committed: Tue Apr 7 18:29:03 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Admin.java | 18 + .../hadoop/hbase/client/ConnectionManager.java | 16 +- .../apache/hadoop/hbase/client/HBaseAdmin.java | 30 + .../hadoop/hbase/protobuf/ProtobufUtil.java | 162 + .../quotas/InvalidQuotaSettingsException.java | 25 + .../hbase/quotas/QuotaExceededException.java | 27 + .../apache/hadoop/hbase/quotas/QuotaFilter.java | 103 + .../hadoop/hbase/quotas/QuotaRetriever.java | 178 + .../apache/hadoop/hbase/quotas/QuotaScope.java | 35 + .../hadoop/hbase/quotas/QuotaSettings.java | 123 + .../hbase/quotas/QuotaSettingsFactory.java | 253 + .../hadoop/hbase/quotas/QuotaTableUtil.java | 408 ++ .../apache/hadoop/hbase/quotas/QuotaType.java | 23 + .../hadoop/hbase/quotas/ThrottleSettings.java | 100 + .../hadoop/hbase/quotas/ThrottleType.java | 34 + .../hbase/quotas/ThrottlingException.java | 146 + .../org/apache/hadoop/hbase/util/Bytes.java | 18 + .../org/apache/hadoop/hbase/util/Sleeper.java | 7 + hbase-protocol/pom.xml | 1 + .../hbase/protobuf/generated/HBaseProtos.java | 135 +- .../hbase/protobuf/generated/MasterProtos.java | 2328 +++++++++- .../hbase/protobuf/generated/QuotaProtos.java | 4378 ++++++++++++++++++ hbase-protocol/src/main/protobuf/HBase.proto | 10 + hbase-protocol/src/main/protobuf/Master.proto | 18 + hbase-protocol/src/main/protobuf/Quota.proto | 73 + .../BaseMasterAndRegionObserver.java | 51 + .../hbase/coprocessor/BaseMasterObserver.java | 50 + .../hbase/coprocessor/MasterObserver.java | 105 + .../org/apache/hadoop/hbase/ipc/RpcServer.java | 1 + .../hadoop/hbase/ipc/RpcServerInterface.java | 2 + .../org/apache/hadoop/hbase/master/HMaster.java | 17 + .../hbase/master/MasterCoprocessorHost.java | 106 + .../hadoop/hbase/master/MasterRpcServices.java | 12 + .../hadoop/hbase/master/MasterServices.java | 7 + .../hbase/quotas/DefaultOperationQuota.java | 135 + .../hadoop/hbase/quotas/MasterQuotaManager.java | 441 ++ .../hadoop/hbase/quotas/NoopOperationQuota.java | 76 + .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 82 + .../hadoop/hbase/quotas/OperationQuota.java | 120 + .../apache/hadoop/hbase/quotas/QuotaCache.java | 312 ++ .../hadoop/hbase/quotas/QuotaLimiter.java | 80 + .../hbase/quotas/QuotaLimiterFactory.java | 37 + .../apache/hadoop/hbase/quotas/QuotaState.java | 114 + .../apache/hadoop/hbase/quotas/QuotaUtil.java | 297 ++ .../apache/hadoop/hbase/quotas/RateLimiter.java | 165 + .../hbase/quotas/RegionServerQuotaManager.java | 189 + .../hadoop/hbase/quotas/TimeBasedLimiter.java | 194 + .../hadoop/hbase/quotas/UserQuotaState.java | 193 + .../hbase/regionserver/HRegionServer.java | 35 + .../hbase/regionserver/RSRpcServices.java | 65 +- .../regionserver/RegionServerServices.java | 13 + .../hbase/security/access/AccessController.java | 31 + .../util/BoundedPriorityBlockingQueue.java | 1 + .../hadoop/hbase/MockRegionServerServices.java | 12 + .../hbase/coprocessor/TestMasterObserver.java | 51 + .../hadoop/hbase/master/MockRegionServer.java | 12 + .../hadoop/hbase/master/TestCatalogJanitor.java | 6 + .../hadoop/hbase/quotas/TestQuotaAdmin.java | 199 + .../hadoop/hbase/quotas/TestQuotaState.java | 221 + .../hadoop/hbase/quotas/TestQuotaTableUtil.java | 196 + .../hadoop/hbase/quotas/TestQuotaThrottle.java | 409 ++ .../hadoop/hbase/quotas/TestRateLimiter.java | 105 + .../security/access/TestAccessController.java | 63 + hbase-shell/src/main/ruby/hbase.rb | 10 + hbase-shell/src/main/ruby/hbase/hbase.rb | 5 + hbase-shell/src/main/ruby/hbase/quotas.rb | 216 + hbase-shell/src/main/ruby/shell.rb | 13 + hbase-shell/src/main/ruby/shell/commands.rb | 4 + .../src/main/ruby/shell/commands/list_quotas.rb | 52 + .../src/main/ruby/shell/commands/set_quota.rb | 70 + 70 files changed, 12982 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 003dda7..4f89467 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -41,6 +41,9 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; +import org.apache.hadoop.hbase.quotas.QuotaFilter; +import org.apache.hadoop.hbase.quotas.QuotaRetriever; +import org.apache.hadoop.hbase.quotas.QuotaSettings; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; @@ -1281,6 +1284,21 @@ public interface Admin extends Abortable, Closeable { * @throws IOException if a remote or network exception occurs */ void deleteSnapshots(final Pattern pattern) throws IOException; + + /** + * Apply the new quota settings. + * @param quota the quota settings + * @throws IOException if a remote or network exception occurs + */ + void setQuota(final QuotaSettings quota) throws IOException; + + /** + * Return a QuotaRetriever to list the quotas based on the filter. + * @param filter the quota settings filter + * @return the quota retriever + * @throws IOException if a remote or network exception occurs + */ + QuotaRetriever getQuotaRetriever(final QuotaFilter filter) throws IOException; /** * Creates and returns a {@link com.google.protobuf.RpcChannel} instance connected to the active http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 900414b..c1e9644 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -18,9 +18,6 @@ */ package org.apache.hadoop.hbase.client; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; @@ -45,11 +42,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -164,6 +156,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanReq import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest; @@ -1996,6 +1990,12 @@ class ConnectionManager { throws ServiceException { return stub.getClusterStatus(controller, request); } + + @Override + public SetQuotaResponse setQuota(RpcController controller, SetQuotaRequest request) + throws ServiceException { + return stub.setQuota(controller, request); + } @Override public MajorCompactionTimestampResponse getLastMajorCompactionTimestamp( http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 0082087..30dc6cb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -131,6 +131,9 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest; +import org.apache.hadoop.hbase.quotas.QuotaFilter; +import org.apache.hadoop.hbase.quotas.QuotaRetriever; +import org.apache.hadoop.hbase.quotas.QuotaSettings; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; @@ -3633,6 +3636,33 @@ public class HBaseAdmin implements Admin { } }); } + + /** + * Apply the new quota settings. + * @param quota the quota settings + * @throws IOException if a remote or network exception occurs + */ + @Override + public void setQuota(final QuotaSettings quota) throws IOException { + executeCallable(new MasterCallable<Void>(getConnection()) { + @Override + public Void call(int callTimeout) throws ServiceException { + this.master.setQuota(null, QuotaSettings.buildSetQuotaRequestProto(quota)); + return null; + } + }); + } + + /** + * Return a Quota Scanner to list the quotas based on the filter. + * @param filter the quota settings filter + * @return the quota scanner + * @throws IOException if a remote or network exception occurs + */ + @Override + public QuotaRetriever getQuotaRetriever(final QuotaFilter filter) throws IOException { + return QuotaRetriever.open(conf, filter); + } private <V> V executeCallable(MasterCallable<V> callable) throws IOException { RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(); http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 1c32f83..fca3ca4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableSet; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -117,6 +118,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; @@ -127,6 +129,9 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.quotas.QuotaScope; +import org.apache.hadoop.hbase.quotas.QuotaType; +import org.apache.hadoop.hbase.quotas.ThrottleType; import org.apache.hadoop.hbase.replication.ReplicationLoadSink; import org.apache.hadoop.hbase.replication.ReplicationLoadSource; import org.apache.hadoop.hbase.security.access.Permission; @@ -2859,6 +2864,163 @@ public final class ProtobufUtil { } return result; } + + /** + * Convert a protocol buffer TimeUnit to a client TimeUnit + * @param proto + * @return the converted client TimeUnit + */ + public static TimeUnit toTimeUnit(final HBaseProtos.TimeUnit proto) { + switch (proto) { + case NANOSECONDS: + return TimeUnit.NANOSECONDS; + case MICROSECONDS: + return TimeUnit.MICROSECONDS; + case MILLISECONDS: + return TimeUnit.MILLISECONDS; + case SECONDS: + return TimeUnit.SECONDS; + case MINUTES: + return TimeUnit.MINUTES; + case HOURS: + return TimeUnit.HOURS; + case DAYS: + return TimeUnit.DAYS; + default: + throw new RuntimeException("Invalid TimeUnit " + proto); + } + } + + /** + * Convert a client TimeUnit to a protocol buffer TimeUnit + * @param timeUnit + * @return the converted protocol buffer TimeUnit + */ + public static HBaseProtos.TimeUnit toProtoTimeUnit(final TimeUnit timeUnit) { + switch (timeUnit) { + case NANOSECONDS: + return HBaseProtos.TimeUnit.NANOSECONDS; + case MICROSECONDS: + return HBaseProtos.TimeUnit.MICROSECONDS; + case MILLISECONDS: + return HBaseProtos.TimeUnit.MILLISECONDS; + case SECONDS: + return HBaseProtos.TimeUnit.SECONDS; + case MINUTES: + return HBaseProtos.TimeUnit.MINUTES; + case HOURS: + return HBaseProtos.TimeUnit.HOURS; + case DAYS: + return HBaseProtos.TimeUnit.DAYS; + default: + throw new RuntimeException("Invalid TimeUnit " + timeUnit); + } + } + + /** + * Convert a protocol buffer ThrottleType to a client ThrottleType + * @param proto + * @return the converted client ThrottleType + */ + public static ThrottleType toThrottleType(final QuotaProtos.ThrottleType proto) { + switch (proto) { + case REQUEST_NUMBER: + return ThrottleType.REQUEST_NUMBER; + case REQUEST_SIZE: + return ThrottleType.REQUEST_SIZE; + default: + throw new RuntimeException("Invalid ThrottleType " + proto); + } + } + + /** + * Convert a client ThrottleType to a protocol buffer ThrottleType + * @param type + * @return the converted protocol buffer ThrottleType + */ + public static QuotaProtos.ThrottleType toProtoThrottleType(final ThrottleType type) { + switch (type) { + case REQUEST_NUMBER: + return QuotaProtos.ThrottleType.REQUEST_NUMBER; + case REQUEST_SIZE: + return QuotaProtos.ThrottleType.REQUEST_SIZE; + default: + throw new RuntimeException("Invalid ThrottleType " + type); + } + } + + /** + * Convert a protocol buffer QuotaScope to a client QuotaScope + * @param proto + * @return the converted client QuotaScope + */ + public static QuotaScope toQuotaScope(final QuotaProtos.QuotaScope proto) { + switch (proto) { + case CLUSTER: + return QuotaScope.CLUSTER; + case MACHINE: + return QuotaScope.MACHINE; + default: + throw new RuntimeException("Invalid QuotaScope " + proto); + } + } + + /** + * Convert a client QuotaScope to a protocol buffer QuotaScope + * @param scope + * @return the converted protocol buffer QuotaScope + */ + public static QuotaProtos.QuotaScope toProtoQuotaScope(final QuotaScope scope) { + switch (scope) { + case CLUSTER: + return QuotaProtos.QuotaScope.CLUSTER; + case MACHINE: + return QuotaProtos.QuotaScope.MACHINE; + default: + throw new RuntimeException("Invalid QuotaScope " + scope); + } + } + + /** + * Convert a protocol buffer QuotaType to a client QuotaType + * @param proto + * @return the converted client QuotaType + */ + public static QuotaType toQuotaScope(final QuotaProtos.QuotaType proto) { + switch (proto) { + case THROTTLE: + return QuotaType.THROTTLE; + default: + throw new RuntimeException("Invalid QuotaType " + proto); + } + } + + /** + * Convert a client QuotaType to a protocol buffer QuotaType + * @param type + * @return the converted protocol buffer QuotaType + */ + public static QuotaProtos.QuotaType toProtoQuotaScope(final QuotaType type) { + switch (type) { + case THROTTLE: + return QuotaProtos.QuotaType.THROTTLE; + default: + throw new RuntimeException("Invalid QuotaType " + type); + } + } + + /** + * Build a protocol buffer TimedQuota + * @param limit the allowed number of request/data per timeUnit + * @param timeUnit the limit time unit + * @param scope the quota scope + * @return the protocol buffer TimedQuota + */ + public static QuotaProtos.TimedQuota toTimedQuota(final long limit, final TimeUnit timeUnit, + final QuotaScope scope) { + return QuotaProtos.TimedQuota.newBuilder().setSoftLimit(limit) + .setTimeUnit(toProtoTimeUnit(timeUnit)).setScope(toProtoQuotaScope(scope)).build(); + } /** * Generates a marker for the WAL so that we propagate the notion of a bulk region load http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/InvalidQuotaSettingsException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/InvalidQuotaSettingsException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/InvalidQuotaSettingsException.java new file mode 100644 index 0000000..8227728 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/InvalidQuotaSettingsException.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Generic quota exceeded exception for invalid settings + */ [email protected] +public class InvalidQuotaSettingsException extends DoNotRetryIOException { + public InvalidQuotaSettingsException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaExceededException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaExceededException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaExceededException.java new file mode 100644 index 0000000..a0bd029 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaExceededException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Generic quota exceeded exception + */ [email protected] [email protected] +public class QuotaExceededException extends DoNotRetryIOException { + public QuotaExceededException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaFilter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaFilter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaFilter.java new file mode 100644 index 0000000..645ccda --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaFilter.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Strings; + +/** + * Filter to use to filter the QuotaRetriever results. + */ [email protected] [email protected] +public class QuotaFilter { + private Set<QuotaType> types = new HashSet<QuotaType>(); + private boolean hasFilters = false; + private String namespaceRegex; + private String tableRegex; + private String userRegex; + + public QuotaFilter() { + } + + /** + * Set the user filter regex + * @param regex the user filter + * @return the quota filter object + */ + public QuotaFilter setUserFilter(final String regex) { + this.userRegex = regex; + hasFilters |= !Strings.isEmpty(regex); + return this; + } + + /** + * Set the table filter regex + * @param regex the table filter + * @return the quota filter object + */ + public QuotaFilter setTableFilter(final String regex) { + this.tableRegex = regex; + hasFilters |= !Strings.isEmpty(regex); + return this; + } + + /** + * Set the namespace filter regex + * @param regex the namespace filter + * @return the quota filter object + */ + public QuotaFilter setNamespaceFilter(final String regex) { + this.namespaceRegex = regex; + hasFilters |= !Strings.isEmpty(regex); + return this; + } + + /** + * Add a type to the filter list + * @param type the type to filter on + * @return the quota filter object + */ + public QuotaFilter addTypeFilter(final QuotaType type) { + this.types.add(type); + hasFilters |= true; + return this; + } + + /** @return true if the filter is empty */ + public boolean isNull() { + return !hasFilters; + } + + /** @return the QuotaType types that we want to filter one */ + public Set<QuotaType> getTypeFilters() { + return types; + } + + /** @return the Namespace filter regex */ + public String getNamespaceFilter() { + return namespaceRegex; + } + + /** @return the Table filter regex */ + public String getTableFilter() { + return tableRegex; + } + + /** @return the User filter regex */ + public String getUserFilter() { + return userRegex; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java new file mode 100644 index 0000000..70e4356 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.util.StringUtils; + +/** + * Scanner to iterate over the quota settings. + */ [email protected] [email protected] +public final class QuotaRetriever implements Closeable, Iterable<QuotaSettings> { + private static final Log LOG = LogFactory.getLog(QuotaRetriever.class); + + private final Queue<QuotaSettings> cache = new LinkedList<QuotaSettings>(); + private ResultScanner scanner; + /** + * Connection to use. Could pass one in and have this class use it but this class wants to be + * standalone. + */ + private Connection connection; + private Table table; + + private QuotaRetriever() { + } + + void init(final Configuration conf, final Scan scan) throws IOException { + this.connection = ConnectionFactory.createConnection(conf); + this.table = this.connection.getTable(QuotaTableUtil.QUOTA_TABLE_NAME); + try { + scanner = table.getScanner(scan); + } catch (IOException e) { + try { + close(); + } catch (IOException ioe) { + LOG.warn("Failed getting scanner and then failed close on cleanup", e); + } + throw e; + } + } + + public void close() throws IOException { + if (this.table != null) { + this.table.close(); + this.table = null; + } + if (this.connection != null) { + this.connection.close(); + this.connection = null; + } + } + + public QuotaSettings next() throws IOException { + if (cache.isEmpty()) { + Result result = scanner.next(); + if (result == null) return null; + + QuotaTableUtil.parseResult(result, new QuotaTableUtil.QuotasVisitor() { + @Override + public void visitUserQuotas(String userName, Quotas quotas) { + cache.addAll(QuotaSettingsFactory.fromUserQuotas(userName, quotas)); + } + + @Override + public void visitUserQuotas(String userName, TableName table, Quotas quotas) { + cache.addAll(QuotaSettingsFactory.fromUserQuotas(userName, table, quotas)); + } + + @Override + public void visitUserQuotas(String userName, String namespace, Quotas quotas) { + cache.addAll(QuotaSettingsFactory.fromUserQuotas(userName, namespace, quotas)); + } + + @Override + public void visitTableQuotas(TableName tableName, Quotas quotas) { + cache.addAll(QuotaSettingsFactory.fromTableQuotas(tableName, quotas)); + } + + @Override + public void visitNamespaceQuotas(String namespace, Quotas quotas) { + cache.addAll(QuotaSettingsFactory.fromNamespaceQuotas(namespace, quotas)); + } + }); + } + return cache.poll(); + } + + @Override + public Iterator<QuotaSettings> iterator() { + return new Iter(); + } + + private class Iter implements Iterator<QuotaSettings> { + private QuotaSettings cache; + + public Iter() { + try { + cache = QuotaRetriever.this.next(); + } catch (IOException e) { + LOG.warn(StringUtils.stringifyException(e)); + } + } + + @Override + public boolean hasNext() { + return cache != null; + } + + @Override + public QuotaSettings next() { + QuotaSettings result = cache; + try { + cache = QuotaRetriever.this.next(); + } catch (IOException e) { + LOG.warn(StringUtils.stringifyException(e)); + } + return result; + } + + @Override + public void remove() { + throw new RuntimeException("remove() not supported"); + } + } + + /** + * Open a QuotaRetriever with no filter, all the quota settings will be returned. + * @param conf Configuration object to use. + * @return the QuotaRetriever + * @throws IOException if a remote or network exception occurs + */ + public static QuotaRetriever open(final Configuration conf) throws IOException { + return open(conf, null); + } + + /** + * Open a QuotaRetriever with the specified filter. + * @param conf Configuration object to use. + * @param filter the QuotaFilter + * @return the QuotaRetriever + * @throws IOException if a remote or network exception occurs + */ + public static QuotaRetriever open(final Configuration conf, final QuotaFilter filter) + throws IOException { + Scan scan = QuotaTableUtil.makeScan(filter); + QuotaRetriever scanner = new QuotaRetriever(); + scanner.init(conf, scan); + return scanner; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaScope.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaScope.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaScope.java new file mode 100644 index 0000000..3de399f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaScope.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Describe the Scope of the quota rules. The quota can be enforced at the cluster level or at + * machine level. + */ [email protected] [email protected] +public enum QuotaScope { + /** + * The specified throttling rules will be applied at the cluster level. A limit of 100req/min + * means 100req/min in total. If you execute 50req on a machine and then 50req on another machine + * then you have to wait your quota to fill up. + */ + CLUSTER, + + /** + * The specified throttling rules will be applied on the machine level. A limit of 100req/min + * means that each machine can execute 100req/min. + */ + MACHINE, +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettings.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettings.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettings.java new file mode 100644 index 0000000..3a8c158 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettings.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest; + [email protected] [email protected] +public abstract class QuotaSettings { + private final String userName; + private final String namespace; + private final TableName tableName; + + protected QuotaSettings(final String userName, final TableName tableName, + final String namespace) { + this.userName = userName; + this.namespace = namespace; + this.tableName = tableName; + } + + public abstract QuotaType getQuotaType(); + + public String getUserName() { + return userName; + } + + public TableName getTableName() { + return tableName; + } + + public String getNamespace() { + return namespace; + } + + /** + * Convert a QuotaSettings to a protocol buffer SetQuotaRequest. This is used internally by the + * Admin client to serialize the quota settings and send them to the master. + */ + public static SetQuotaRequest buildSetQuotaRequestProto(final QuotaSettings settings) { + SetQuotaRequest.Builder builder = SetQuotaRequest.newBuilder(); + if (settings.getUserName() != null) { + builder.setUserName(settings.getUserName()); + } + if (settings.getTableName() != null) { + builder.setTableName(ProtobufUtil.toProtoTableName(settings.getTableName())); + } + if (settings.getNamespace() != null) { + builder.setNamespace(settings.getNamespace()); + } + settings.setupSetQuotaRequest(builder); + return builder.build(); + } + + /** + * Called by toSetQuotaRequestProto() the subclass should implement this method to set the + * specific SetQuotaRequest properties. + */ + protected abstract void setupSetQuotaRequest(SetQuotaRequest.Builder builder); + + protected String ownerToString() { + StringBuilder builder = new StringBuilder(); + if (userName != null) { + builder.append("USER => '"); + builder.append(userName); + builder.append("', "); + } + if (tableName != null) { + builder.append("TABLE => '"); + builder.append(tableName.toString()); + builder.append("', "); + } + if (namespace != null) { + builder.append("NAMESPACE => '"); + builder.append(namespace); + builder.append("', "); + } + return builder.toString(); + } + + protected static String sizeToString(final long size) { + if (size >= (1L << 50)) return String.format("%dP", size / (1L << 50)); + if (size >= (1L << 40)) return String.format("%dT", size / (1L << 40)); + if (size >= (1L << 30)) return String.format("%dG", size / (1L << 30)); + if (size >= (1L << 20)) return String.format("%dM", size / (1L << 20)); + if (size >= (1L << 10)) return String.format("%dK", size / (1L << 10)); + return String.format("%dB", size); + } + + protected static String timeToString(final TimeUnit timeUnit) { + switch (timeUnit) { + case NANOSECONDS: + return "nsec"; + case MICROSECONDS: + return "usec"; + case MILLISECONDS: + return "msec"; + case SECONDS: + return "sec"; + case MINUTES: + return "min"; + case HOURS: + return "hour"; + case DAYS: + return "day"; + default: + throw new RuntimeException("Invalid TimeUnit " + timeUnit); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java new file mode 100644 index 0000000..2499e06 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; + [email protected] [email protected] +public class QuotaSettingsFactory { + private QuotaSettingsFactory() { + // Utility class + } + + static class QuotaGlobalsSettingsBypass extends QuotaSettings { + private final boolean bypassGlobals; + + QuotaGlobalsSettingsBypass(final String userName, final TableName tableName, + final String namespace, final boolean bypassGlobals) { + super(userName, tableName, namespace); + this.bypassGlobals = bypassGlobals; + } + + @Override + public QuotaType getQuotaType() { + return QuotaType.GLOBAL_BYPASS; + } + + @Override + protected void setupSetQuotaRequest(SetQuotaRequest.Builder builder) { + builder.setBypassGlobals(bypassGlobals); + } + + @Override + public String toString() { + return "GLOBAL_BYPASS => " + bypassGlobals; + } + } + + /* + * ========================================================================== QuotaSettings from + * the Quotas object + */ + static List<QuotaSettings> fromUserQuotas(final String userName, final Quotas quotas) { + return fromQuotas(userName, null, null, quotas); + } + + static List<QuotaSettings> fromUserQuotas(final String userName, final TableName tableName, + final Quotas quotas) { + return fromQuotas(userName, tableName, null, quotas); + } + + static List<QuotaSettings> fromUserQuotas(final String userName, final String namespace, + final Quotas quotas) { + return fromQuotas(userName, null, namespace, quotas); + } + + static List<QuotaSettings> fromTableQuotas(final TableName tableName, final Quotas quotas) { + return fromQuotas(null, tableName, null, quotas); + } + + static List<QuotaSettings> fromNamespaceQuotas(final String namespace, final Quotas quotas) { + return fromQuotas(null, null, namespace, quotas); + } + + private static List<QuotaSettings> fromQuotas(final String userName, final TableName tableName, + final String namespace, final Quotas quotas) { + List<QuotaSettings> settings = new ArrayList<QuotaSettings>(); + if (quotas.hasThrottle()) { + settings.addAll(fromThrottle(userName, tableName, namespace, quotas.getThrottle())); + } + if (quotas.getBypassGlobals() == true) { + settings.add(new QuotaGlobalsSettingsBypass(userName, tableName, namespace, true)); + } + return settings; + } + + private static List<QuotaSettings> fromThrottle(final String userName, final TableName tableName, + final String namespace, final QuotaProtos.Throttle throttle) { + List<QuotaSettings> settings = new ArrayList<QuotaSettings>(); + if (throttle.hasReqNum()) { + settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, + ThrottleType.REQUEST_NUMBER, throttle.getReqNum())); + } + if (throttle.hasReqSize()) { + settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, + ThrottleType.REQUEST_SIZE, throttle.getReqSize())); + } + return settings; + } + + /* + * ========================================================================== RPC Throttle + */ + + /** + * Throttle the specified user. + * @param userName the user to throttle + * @param type the type of throttling + * @param limit the allowed number of request/data per timeUnit + * @param timeUnit the limit time unit + * @return the quota settings + */ + public static QuotaSettings throttleUser(final String userName, final ThrottleType type, + final long limit, final TimeUnit timeUnit) { + return throttle(userName, null, null, type, limit, timeUnit); + } + + /** + * Throttle the specified user on the specified table. + * @param userName the user to throttle + * @param tableName the table to throttle + * @param type the type of throttling + * @param limit the allowed number of request/data per timeUnit + * @param timeUnit the limit time unit + * @return the quota settings + */ + public static QuotaSettings throttleUser(final String userName, final TableName tableName, + final ThrottleType type, final long limit, final TimeUnit timeUnit) { + return throttle(userName, tableName, null, type, limit, timeUnit); + } + + /** + * Throttle the specified user on the specified namespace. + * @param userName the user to throttle + * @param namespace the namespace to throttle + * @param type the type of throttling + * @param limit the allowed number of request/data per timeUnit + * @param timeUnit the limit time unit + * @return the quota settings + */ + public static QuotaSettings throttleUser(final String userName, final String namespace, + final ThrottleType type, final long limit, final TimeUnit timeUnit) { + return throttle(userName, null, namespace, type, limit, timeUnit); + } + + /** + * Remove the throttling for the specified user. + * @param userName the user + * @return the quota settings + */ + public static QuotaSettings unthrottleUser(final String userName) { + return throttle(userName, null, null, null, 0, null); + } + + /** + * Remove the throttling for the specified user on the specified table. + * @param userName the user + * @param tableName the table + * @return the quota settings + */ + public static QuotaSettings unthrottleUser(final String userName, final TableName tableName) { + return throttle(userName, tableName, null, null, 0, null); + } + + /** + * Remove the throttling for the specified user on the specified namespace. + * @param userName the user + * @param namespace the namespace + * @return the quota settings + */ + public static QuotaSettings unthrottleUser(final String userName, final String namespace) { + return throttle(userName, null, namespace, null, 0, null); + } + + /** + * Throttle the specified table. + * @param tableName the table to throttle + * @param type the type of throttling + * @param limit the allowed number of request/data per timeUnit + * @param timeUnit the limit time unit + * @return the quota settings + */ + public static QuotaSettings throttleTable(final TableName tableName, final ThrottleType type, + final long limit, final TimeUnit timeUnit) { + return throttle(null, tableName, null, type, limit, timeUnit); + } + + /** + * Remove the throttling for the specified table. + * @param tableName the table + * @return the quota settings + */ + public static QuotaSettings unthrottleTable(final TableName tableName) { + return throttle(null, tableName, null, null, 0, null); + } + + /** + * Throttle the specified namespace. + * @param namespace the namespace to throttle + * @param type the type of throttling + * @param limit the allowed number of request/data per timeUnit + * @param timeUnit the limit time unit + * @return the quota settings + */ + public static QuotaSettings throttleNamespace(final String namespace, final ThrottleType type, + final long limit, final TimeUnit timeUnit) { + return throttle(null, null, namespace, type, limit, timeUnit); + } + + /** + * Remove the throttling for the specified namespace. + * @param namespace the namespace + * @return the quota settings + */ + public static QuotaSettings unthrottleNamespace(final String namespace) { + return throttle(null, null, namespace, null, 0, null); + } + + /* Throttle helper */ + private static QuotaSettings throttle(final String userName, final TableName tableName, + final String namespace, final ThrottleType type, final long limit, final TimeUnit timeUnit) { + QuotaProtos.ThrottleRequest.Builder builder = QuotaProtos.ThrottleRequest.newBuilder(); + if (type != null) { + builder.setType(ProtobufUtil.toProtoThrottleType(type)); + } + if (timeUnit != null) { + builder.setTimedQuota(ProtobufUtil.toTimedQuota(limit, timeUnit, QuotaScope.MACHINE)); + } + return new ThrottleSettings(userName, tableName, namespace, builder.build()); + } + + /* + * ========================================================================== Global Settings + */ + + /** + * Set the "bypass global settings" for the specified user + * @param userName the user to throttle + * @param bypassGlobals true if the global settings should be bypassed + * @return the quota settings + */ + public static QuotaSettings bypassGlobals(final String userName, final boolean bypassGlobals) { + return new QuotaGlobalsSettingsBypass(userName, null, null, bypassGlobals); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java new file mode 100644 index 0000000..b89f329 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Strings; + +/** + * Helper class to interact with the quota table. + * + * <pre> + * ROW-KEY FAM/QUAL DATA + * n.<namespace> q:s <global-quotas> + * t.<table> q:s <global-quotas> + * u.<user> q:s <global-quotas> + * u.<user> q:s.<table> <table-quotas> + * u.<user> q:s.<ns>: <namespace-quotas> + * </pre> + */ [email protected] [email protected] +public class QuotaTableUtil { + private static final Log LOG = LogFactory.getLog(QuotaTableUtil.class); + + /** System table for quotas */ + public static final TableName QUOTA_TABLE_NAME = TableName.valueOf( + NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "quota"); + + protected static final byte[] QUOTA_FAMILY_INFO = Bytes.toBytes("q"); + protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u"); + protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s"); + protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s."); + protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u."); + protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t."); + protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n."); + + /* + * ========================================================================= Quota "settings" + * helpers + */ + public static Quotas getTableQuota(final Connection connection, final TableName table) + throws IOException { + return getQuotas(connection, getTableRowKey(table)); + } + + public static Quotas getNamespaceQuota(final Connection connection, final String namespace) + throws IOException { + return getQuotas(connection, getNamespaceRowKey(namespace)); + } + + public static Quotas getUserQuota(final Connection connection, final String user) + throws IOException { + return getQuotas(connection, getUserRowKey(user)); + } + + public static Quotas getUserQuota(final Connection connection, final String user, + final TableName table) throws IOException { + return getQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table)); + } + + public static Quotas getUserQuota(final Connection connection, final String user, + final String namespace) throws IOException { + return getQuotas(connection, getUserRowKey(user), + getSettingsQualifierForUserNamespace(namespace)); + } + + private static Quotas getQuotas(final Connection connection, final byte[] rowKey) + throws IOException { + return getQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS); + } + + private static Quotas getQuotas(final Connection connection, final byte[] rowKey, + final byte[] qualifier) throws IOException { + Get get = new Get(rowKey); + get.addColumn(QUOTA_FAMILY_INFO, qualifier); + Result result = doGet(connection, get); + if (result.isEmpty()) { + return null; + } + return quotasFromData(result.getValue(QUOTA_FAMILY_INFO, qualifier)); + } + + public static Get makeGetForTableQuotas(final TableName table) { + Get get = new Get(getTableRowKey(table)); + get.addFamily(QUOTA_FAMILY_INFO); + return get; + } + + public static Get makeGetForNamespaceQuotas(final String namespace) { + Get get = new Get(getNamespaceRowKey(namespace)); + get.addFamily(QUOTA_FAMILY_INFO); + return get; + } + + public static Get makeGetForUserQuotas(final String user, final Iterable<TableName> tables, + final Iterable<String> namespaces) { + Get get = new Get(getUserRowKey(user)); + get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); + for (final TableName table : tables) { + get.addColumn(QUOTA_FAMILY_INFO, getSettingsQualifierForUserTable(table)); + } + for (final String ns : namespaces) { + get.addColumn(QUOTA_FAMILY_INFO, getSettingsQualifierForUserNamespace(ns)); + } + return get; + } + + public static Scan makeScan(final QuotaFilter filter) { + Scan scan = new Scan(); + scan.addFamily(QUOTA_FAMILY_INFO); + if (filter != null && !filter.isNull()) { + scan.setFilter(makeFilter(filter)); + } + return scan; + } + + /** + * converts quotafilter to serializeable filterlists. + */ + public static Filter makeFilter(final QuotaFilter filter) { + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + if (!Strings.isEmpty(filter.getUserFilter())) { + FilterList userFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE); + boolean hasFilter = false; + + if (!Strings.isEmpty(filter.getNamespaceFilter())) { + FilterList nsFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL); + nsFilters.addFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator( + getUserRowKeyRegex(filter.getUserFilter()), 0))); + nsFilters.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, + new RegexStringComparator(getSettingsQualifierRegexForUserNamespace(filter + .getNamespaceFilter()), 0))); + userFilters.addFilter(nsFilters); + hasFilter = true; + } + if (!Strings.isEmpty(filter.getTableFilter())) { + FilterList tableFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL); + tableFilters.addFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, + new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0))); + tableFilters.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, + new RegexStringComparator( + getSettingsQualifierRegexForUserTable(filter.getTableFilter()), 0))); + userFilters.addFilter(tableFilters); + hasFilter = true; + } + if (!hasFilter) { + userFilters.addFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, + new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0))); + } + + filterList.addFilter(userFilters); + } else if (!Strings.isEmpty(filter.getTableFilter())) { + filterList.addFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator( + getTableRowKeyRegex(filter.getTableFilter()), 0))); + } else if (!Strings.isEmpty(filter.getNamespaceFilter())) { + filterList.addFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator( + getNamespaceRowKeyRegex(filter.getNamespaceFilter()), 0))); + } + return filterList; + } + + public static interface UserQuotasVisitor { + void visitUserQuotas(final String userName, final Quotas quotas) throws IOException; + + void visitUserQuotas(final String userName, final TableName table, final Quotas quotas) + throws IOException; + + void visitUserQuotas(final String userName, final String namespace, final Quotas quotas) + throws IOException; + } + + public static interface TableQuotasVisitor { + void visitTableQuotas(final TableName tableName, final Quotas quotas) throws IOException; + } + + public static interface NamespaceQuotasVisitor { + void visitNamespaceQuotas(final String namespace, final Quotas quotas) throws IOException; + } + + public static interface QuotasVisitor extends UserQuotasVisitor, TableQuotasVisitor, + NamespaceQuotasVisitor { + } + + public static void parseResult(final Result result, final QuotasVisitor visitor) + throws IOException { + byte[] row = result.getRow(); + if (isNamespaceRowKey(row)) { + parseNamespaceResult(result, visitor); + } else if (isTableRowKey(row)) { + parseTableResult(result, visitor); + } else if (isUserRowKey(row)) { + parseUserResult(result, visitor); + } else { + LOG.warn("unexpected row-key: " + Bytes.toString(row)); + } + } + + public static void + parseNamespaceResult(final Result result, final NamespaceQuotasVisitor visitor) + throws IOException { + String namespace = getNamespaceFromRowKey(result.getRow()); + parseNamespaceResult(namespace, result, visitor); + } + + protected static void parseNamespaceResult(final String namespace, final Result result, + final NamespaceQuotasVisitor visitor) throws IOException { + byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); + if (data != null) { + Quotas quotas = quotasFromData(data); + visitor.visitNamespaceQuotas(namespace, quotas); + } + } + + public static void parseTableResult(final Result result, final TableQuotasVisitor visitor) + throws IOException { + TableName table = getTableFromRowKey(result.getRow()); + parseTableResult(table, result, visitor); + } + + protected static void parseTableResult(final TableName table, final Result result, + final TableQuotasVisitor visitor) throws IOException { + byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); + if (data != null) { + Quotas quotas = quotasFromData(data); + visitor.visitTableQuotas(table, quotas); + } + } + + public static void parseUserResult(final Result result, final UserQuotasVisitor visitor) + throws IOException { + String userName = getUserFromRowKey(result.getRow()); + parseUserResult(userName, result, visitor); + } + + protected static void parseUserResult(final String userName, final Result result, + final UserQuotasVisitor visitor) throws IOException { + Map<byte[], byte[]> familyMap = result.getFamilyMap(QUOTA_FAMILY_INFO); + if (familyMap == null || familyMap.isEmpty()) return; + + for (Map.Entry<byte[], byte[]> entry : familyMap.entrySet()) { + Quotas quotas = quotasFromData(entry.getValue()); + if (Bytes.startsWith(entry.getKey(), QUOTA_QUALIFIER_SETTINGS_PREFIX)) { + String name = Bytes.toString(entry.getKey(), QUOTA_QUALIFIER_SETTINGS_PREFIX.length); + if (name.charAt(name.length() - 1) == TableName.NAMESPACE_DELIM) { + String namespace = name.substring(0, name.length() - 1); + visitor.visitUserQuotas(userName, namespace, quotas); + } else { + TableName table = TableName.valueOf(name); + visitor.visitUserQuotas(userName, table, quotas); + } + } else if (Bytes.equals(entry.getKey(), QUOTA_QUALIFIER_SETTINGS)) { + visitor.visitUserQuotas(userName, quotas); + } + } + } + + /* + * ========================================================================= Quotas protobuf + * helpers + */ + protected static Quotas quotasFromData(final byte[] data) throws IOException { + int magicLen = ProtobufUtil.lengthOfPBMagic(); + if (!ProtobufUtil.isPBMagicPrefix(data, 0, magicLen)) { + throw new IOException("Missing pb magic prefix"); + } + return Quotas.parseFrom(new ByteArrayInputStream(data, magicLen, data.length - magicLen)); + } + + protected static byte[] quotasToData(final Quotas data) throws IOException { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + stream.write(ProtobufUtil.PB_MAGIC); + data.writeTo(stream); + return stream.toByteArray(); + } + + public static boolean isEmptyQuota(final Quotas quotas) { + boolean hasSettings = false; + hasSettings |= quotas.hasThrottle(); + hasSettings |= quotas.hasBypassGlobals(); + return !hasSettings; + } + + /* + * ========================================================================= HTable helpers + */ + protected static Result doGet(final Connection connection, final Get get) throws IOException { + try (Table table = connection.getTable(QUOTA_TABLE_NAME)) { + return table.get(get); + } + } + + protected static Result[] doGet(final Connection connection, final List<Get> gets) + throws IOException { + try (Table table = connection.getTable(QUOTA_TABLE_NAME)) { + return table.get(gets); + } + } + + /* + * ========================================================================= Quota table row key + * helpers + */ + protected static byte[] getUserRowKey(final String user) { + return Bytes.add(QUOTA_USER_ROW_KEY_PREFIX, Bytes.toBytes(user)); + } + + protected static byte[] getTableRowKey(final TableName table) { + return Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, table.getName()); + } + + protected static byte[] getNamespaceRowKey(final String namespace) { + return Bytes.add(QUOTA_NAMESPACE_ROW_KEY_PREFIX, Bytes.toBytes(namespace)); + } + + protected static byte[] getSettingsQualifierForUserTable(final TableName tableName) { + return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX, tableName.getName()); + } + + protected static byte[] getSettingsQualifierForUserNamespace(final String namespace) { + return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX, + Bytes.toBytes(namespace + TableName.NAMESPACE_DELIM)); + } + + protected static String getUserRowKeyRegex(final String user) { + return getRowKeyRegEx(QUOTA_USER_ROW_KEY_PREFIX, user); + } + + protected static String getTableRowKeyRegex(final String table) { + return getRowKeyRegEx(QUOTA_TABLE_ROW_KEY_PREFIX, table); + } + + protected static String getNamespaceRowKeyRegex(final String namespace) { + return getRowKeyRegEx(QUOTA_NAMESPACE_ROW_KEY_PREFIX, namespace); + } + + private static String getRowKeyRegEx(final byte[] prefix, final String regex) { + return '^' + Pattern.quote(Bytes.toString(prefix)) + regex + '$'; + } + + protected static String getSettingsQualifierRegexForUserTable(final String table) { + return '^' + Pattern.quote(Bytes.toString(QUOTA_QUALIFIER_SETTINGS_PREFIX)) + table + "(?<!" + + Pattern.quote(Character.toString(TableName.NAMESPACE_DELIM)) + ")$"; + } + + protected static String getSettingsQualifierRegexForUserNamespace(final String namespace) { + return '^' + Pattern.quote(Bytes.toString(QUOTA_QUALIFIER_SETTINGS_PREFIX)) + namespace + + Pattern.quote(Character.toString(TableName.NAMESPACE_DELIM)) + '$'; + } + + protected static boolean isNamespaceRowKey(final byte[] key) { + return Bytes.startsWith(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX); + } + + protected static String getNamespaceFromRowKey(final byte[] key) { + return Bytes.toString(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX.length); + } + + protected static boolean isTableRowKey(final byte[] key) { + return Bytes.startsWith(key, QUOTA_TABLE_ROW_KEY_PREFIX); + } + + protected static TableName getTableFromRowKey(final byte[] key) { + return TableName.valueOf(Bytes.toString(key, QUOTA_TABLE_ROW_KEY_PREFIX.length)); + } + + protected static boolean isUserRowKey(final byte[] key) { + return Bytes.startsWith(key, QUOTA_USER_ROW_KEY_PREFIX); + } + + protected static String getUserFromRowKey(final byte[] key) { + return Bytes.toString(key, QUOTA_USER_ROW_KEY_PREFIX.length); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaType.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaType.java new file mode 100644 index 0000000..1c7e822 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaType.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Describe the Quota Type. + */ [email protected] [email protected] +public enum QuotaType { + THROTTLE, GLOBAL_BYPASS, +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java new file mode 100644 index 0000000..dc036b4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos; + [email protected] [email protected] +class ThrottleSettings extends QuotaSettings { + private final QuotaProtos.ThrottleRequest proto; + + ThrottleSettings(final String userName, final TableName tableName, final String namespace, + final QuotaProtos.ThrottleRequest proto) { + super(userName, tableName, namespace); + this.proto = proto; + } + + public ThrottleType getThrottleType() { + return ProtobufUtil.toThrottleType(proto.getType()); + } + + public long getSoftLimit() { + return proto.hasTimedQuota() ? proto.getTimedQuota().getSoftLimit() : -1; + } + + public TimeUnit getTimeUnit() { + return proto.hasTimedQuota() ? ProtobufUtil.toTimeUnit(proto.getTimedQuota().getTimeUnit()) + : null; + } + + @Override + public QuotaType getQuotaType() { + return QuotaType.THROTTLE; + } + + @Override + protected void setupSetQuotaRequest(SetQuotaRequest.Builder builder) { + builder.setThrottle(proto); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("TYPE => THROTTLE"); + if (proto.hasType()) { + builder.append(", THROTTLE_TYPE => "); + builder.append(proto.getType().toString()); + } + if (proto.hasTimedQuota()) { + QuotaProtos.TimedQuota timedQuota = proto.getTimedQuota(); + builder.append(", LIMIT => "); + if (timedQuota.hasSoftLimit()) { + switch (getThrottleType()) { + case REQUEST_NUMBER: + builder.append(String.format("%dreq", timedQuota.getSoftLimit())); + break; + case REQUEST_SIZE: + builder.append(sizeToString(timedQuota.getSoftLimit())); + break; + default: + throw new RuntimeException("Invalid throttle type: " + getThrottleType()); + } + } else if (timedQuota.hasShare()) { + builder.append(String.format("%.2f%%", timedQuota.getShare())); + } + builder.append('/'); + builder.append(timeToString(ProtobufUtil.toTimeUnit(timedQuota.getTimeUnit()))); + if (timedQuota.hasScope()) { + builder.append(", SCOPE => "); + builder.append(timedQuota.getScope().toString()); + } + } else { + builder.append(", LIMIT => NONE"); + } + return builder.toString(); + } + + static ThrottleSettings fromTimedQuota(final String userName, final TableName tableName, + final String namespace, ThrottleType type, QuotaProtos.TimedQuota timedQuota) { + QuotaProtos.ThrottleRequest.Builder builder = QuotaProtos.ThrottleRequest.newBuilder(); + builder.setType(ProtobufUtil.toProtoThrottleType(type)); + builder.setTimedQuota(timedQuota); + return new ThrottleSettings(userName, tableName, namespace, builder.build()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java new file mode 100644 index 0000000..bb5c093 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Describe the Throttle Type. + */ [email protected] [email protected] +public enum ThrottleType { + /** Throttling based on the number of request per time-unit */ + REQUEST_NUMBER, + + /** Throttling based on the read+write data size */ + REQUEST_SIZE, +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottlingException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottlingException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottlingException.java new file mode 100644 index 0000000..547f902 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottlingException.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Describe the throttling result. TODO: At some point this will be handled on the client side to + * prevent operation to go on the server if the waitInterval is grater than the one got as result of + * this exception. + */ [email protected] [email protected] +public class ThrottlingException extends QuotaExceededException { + private static final long serialVersionUID = 1406576492085155743L; + + @InterfaceAudience.Public + @InterfaceStability.Evolving + public enum Type { + NumRequestsExceeded, NumReadRequestsExceeded, NumWriteRequestsExceeded, WriteSizeExceeded, + ReadSizeExceeded, + } + + private static final String[] MSG_TYPE = new String[] { "number of requests exceeded", + "number of read requests exceeded", "number of write requests exceeded", + "write size limit exceeded", "read size limit exceeded", }; + + private static final String MSG_WAIT = " - wait "; + + private long waitInterval; + private Type type; + + public ThrottlingException(String msg) { + super(msg); + + // Dirty workaround to get the information after + // ((RemoteException)e.getCause()).unwrapRemoteException() + for (int i = 0; i < MSG_TYPE.length; ++i) { + int index = msg.indexOf(MSG_TYPE[i]); + if (index >= 0) { + String waitTimeStr = msg.substring(index + MSG_TYPE[i].length() + MSG_WAIT.length()); + type = Type.values()[i]; + waitInterval = timeFromString(waitTimeStr); + break; + } + } + } + + public ThrottlingException(final Type type, final long waitInterval, final String msg) { + super(msg); + this.waitInterval = waitInterval; + this.type = type; + } + + public Type getType() { + return this.type; + } + + public long getWaitInterval() { + return this.waitInterval; + } + + public static void throwNumRequestsExceeded(final long waitInterval) throws ThrottlingException { + throwThrottlingException(Type.NumRequestsExceeded, waitInterval); + } + + public static void throwNumReadRequestsExceeded(final long waitInterval) + throws ThrottlingException { + throwThrottlingException(Type.NumReadRequestsExceeded, waitInterval); + } + + public static void throwNumWriteRequestsExceeded(final long waitInterval) + throws ThrottlingException { + throwThrottlingException(Type.NumWriteRequestsExceeded, waitInterval); + } + + public static void throwWriteSizeExceeded(final long waitInterval) throws ThrottlingException { + throwThrottlingException(Type.WriteSizeExceeded, waitInterval); + } + + public static void throwReadSizeExceeded(final long waitInterval) throws ThrottlingException { + throwThrottlingException(Type.ReadSizeExceeded, waitInterval); + } + + private static void throwThrottlingException(final Type type, final long waitInterval) + throws ThrottlingException { + String msg = MSG_TYPE[type.ordinal()] + MSG_WAIT + formatTime(waitInterval); + throw new ThrottlingException(type, waitInterval, msg); + } + + public static String formatTime(long timeDiff) { + StringBuilder buf = new StringBuilder(); + long hours = timeDiff / (60 * 60 * 1000); + long rem = (timeDiff % (60 * 60 * 1000)); + long minutes = rem / (60 * 1000); + rem = rem % (60 * 1000); + float seconds = rem / 1000.0f; + + if (hours != 0) { + buf.append(hours); + buf.append("hrs, "); + } + if (minutes != 0) { + buf.append(minutes); + buf.append("mins, "); + } + buf.append(String.format("%.2fsec", seconds)); + return buf.toString(); + } + + private static long timeFromString(String timeDiff) { + Pattern[] patterns = + new Pattern[] { Pattern.compile("^(\\d+\\.\\d\\d)sec"), + Pattern.compile("^(\\d+)mins, (\\d+\\.\\d\\d)sec"), + Pattern.compile("^(\\d+)hrs, (\\d+)mins, (\\d+\\.\\d\\d)sec") }; + + for (int i = 0; i < patterns.length; ++i) { + Matcher m = patterns[i].matcher(timeDiff); + if (m.find()) { + long time = Math.round(Float.parseFloat(m.group(1 + i)) * 1000); + if (i > 0) { + time += Long.parseLong(m.group(i)) * (60 * 1000); + } + if (i > 1) { + time += Long.parseLong(m.group(i - 1)) * (60 * 60 * 1000); + } + return time; + } + } + + return -1; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java index e7c9522..cf6a9e1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -362,6 +362,24 @@ public class Bytes { final byte [] b2) { return toString(b1, 0, b1.length) + sep + toString(b2, 0, b2.length); } + + /** + * This method will convert utf8 encoded bytes into a string. If the given byte array is null, + * this method will return null. + * @param b Presumed UTF-8 encoded byte array. + * @param off offset into array + * @return String made from <code>b</code> or null + */ + public static String toString(final byte[] b, int off) { + if (b == null) { + return null; + } + int len = b.length - off; + if (len <= 0) { + return ""; + } + return new String(b, off, len, UTF8_CHARSET); + } /** * This method will convert utf8 encoded bytes into a string. If http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java index 071250b..416ef02 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java @@ -114,4 +114,11 @@ public class Sleeper { triggerWake = false; } } + + /** + * @return the sleep period in milliseconds + */ + public final int getPeriod() { + return period; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-protocol/pom.xml b/hbase-protocol/pom.xml index 3b0356d..08b379c 100644 --- a/hbase-protocol/pom.xml +++ b/hbase-protocol/pom.xml @@ -176,6 +176,7 @@ <include>MapReduce.proto</include> <include>Master.proto</include> <include>MultiRowMutation.proto</include> + <include>Quota.proto</include> <include>RegionServerStatus.proto</include> <include>RowProcessor.proto</include> <include>RPC.proto</include>
