http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-protocol/src/main/protobuf/HBase.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/HBase.proto b/hbase-protocol/src/main/protobuf/HBase.proto index ca09777..f78163e 100644 --- a/hbase-protocol/src/main/protobuf/HBase.proto +++ b/hbase-protocol/src/main/protobuf/HBase.proto @@ -180,6 +180,16 @@ message ProcedureDescription { message EmptyMsg { } +enum TimeUnit { + NANOSECONDS = 1; + MICROSECONDS = 2; + MILLISECONDS = 3; + SECONDS = 4; + MINUTES = 5; + HOURS = 6; + DAYS = 7; +} + message LongMsg { required int64 long_msg = 1; }
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-protocol/src/main/protobuf/Master.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/Master.proto b/hbase-protocol/src/main/protobuf/Master.proto index 462bb6a..e2814e7 100644 --- a/hbase-protocol/src/main/protobuf/Master.proto +++ b/hbase-protocol/src/main/protobuf/Master.proto @@ -28,6 +28,7 @@ option optimize_for = SPEED; import "HBase.proto"; import "Client.proto"; import "ClusterStatus.proto"; +import "Quota.proto"; /* Column-level protobufs */ @@ -371,6 +372,20 @@ message IsProcedureDoneResponse { optional ProcedureDescription snapshot = 2; } +message SetQuotaRequest { + optional string user_name = 1; + optional string user_group = 2; + optional string namespace = 3; + optional TableName table_name = 4; + + optional bool remove_all = 5; + optional bool bypass_globals = 6; + optional ThrottleRequest throttle = 7; +} + +message SetQuotaResponse { +} + message MajorCompactionTimestampRequest { required TableName table_name = 1; } @@ -597,6 +612,9 @@ service MasterService { rpc ListTableNamesByNamespace(ListTableNamesByNamespaceRequest) returns(ListTableNamesByNamespaceResponse); + /** Apply the new quota settings */ + rpc SetQuota(SetQuotaRequest) returns(SetQuotaResponse); + /** Returns the timestamp of the last major compaction */ rpc getLastMajorCompactionTimestamp(MajorCompactionTimestampRequest) returns(MajorCompactionTimestampResponse); http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-protocol/src/main/protobuf/Quota.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/Quota.proto b/hbase-protocol/src/main/protobuf/Quota.proto new file mode 100644 index 0000000..6ef15fe --- /dev/null +++ b/hbase-protocol/src/main/protobuf/Quota.proto @@ -0,0 +1,73 @@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "QuotaProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "HBase.proto"; + +enum QuotaScope { + CLUSTER = 1; + MACHINE = 2; +} + +message TimedQuota { + required TimeUnit time_unit = 1; + optional uint64 soft_limit = 2; + optional float share = 3; + optional QuotaScope scope = 4 [default = MACHINE]; +} + +enum ThrottleType { + REQUEST_NUMBER = 1; + REQUEST_SIZE = 2; + WRITE_NUMBER = 3; + WRITE_SIZE = 4; + READ_NUMBER = 5; + READ_SIZE = 6; +} + +message Throttle { + optional TimedQuota req_num = 1; + optional TimedQuota req_size = 2; + + optional TimedQuota write_num = 3; + optional TimedQuota write_size = 4; + + optional TimedQuota read_num = 5; + optional TimedQuota read_size = 6; +} + +message ThrottleRequest { + optional ThrottleType type = 1; + optional TimedQuota timed_quota = 2; +} + +enum QuotaType { + THROTTLE = 1; +} + +message Quotas { + optional bool bypass_globals = 1 [default = false]; + optional Throttle throttle = 2; +} + +message QuotaUsage { +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java index 98c0563..49f21d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; import java.io.IOException; import java.util.List; @@ -467,4 +468,54 @@ public abstract class BaseMasterAndRegionObserver extends BaseRegionObserver public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName) throws IOException { } + + @Override + public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final Quotas quotas) throws IOException { + } + + @Override + public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final Quotas quotas) throws IOException { + } + + @Override + public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final TableName tableName, final Quotas quotas) throws IOException { + } + + @Override + public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final TableName tableName, final Quotas quotas) throws IOException { + } + + @Override + public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final String namespace, final Quotas quotas) throws IOException { + } + + @Override + public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final String namespace, final Quotas quotas) throws IOException { + } + + @Override + public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final TableName tableName, final Quotas quotas) throws IOException { + } + + @Override + public void postSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final TableName tableName, final Quotas quotas) throws IOException { + } + + @Override + public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String namespace, final Quotas quotas) throws IOException { + } + + @Override + public void postSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String namespace, final Quotas quotas) throws IOException { + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java index 4748a1b..99a8552 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; import java.io.IOException; import java.util.List; @@ -462,4 +463,53 @@ public class BaseMasterObserver implements MasterObserver { TableName tableName) throws IOException { } + @Override + public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final Quotas quotas) throws IOException { + } + + @Override + public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final Quotas quotas) throws IOException { + } + + @Override + public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final TableName tableName, final Quotas quotas) throws IOException { + } + + @Override + public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final TableName tableName, final Quotas quotas) throws IOException { + } + + @Override + public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final String namespace, final Quotas quotas) throws IOException { + } + + @Override + public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final String namespace, final Quotas quotas) throws IOException { + } + + @Override + public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final TableName tableName, final Quotas quotas) throws IOException { + } + + @Override + public void postSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final TableName tableName, final Quotas quotas) throws IOException { + } + + @Override + public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String namespace, final Quotas quotas) throws IOException { + } + + @Override + public void postSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String namespace, final Quotas quotas) throws IOException { + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java index 2d99754..5dc50da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; /** * Defines coprocessor hooks for interacting with operations on the @@ -842,4 +843,108 @@ public interface MasterObserver extends Coprocessor { */ void postTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx, final TableName tableName) throws IOException; + + /** + * Called before the quota for the user is stored. + * @param ctx the environment to interact with the framework and master + * @param userName the name of user + * @param quotas the quota settings + * @throws IOException + */ + void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final Quotas quotas) throws IOException; + + /** + * Called after the quota for the user is stored. + * @param ctx the environment to interact with the framework and master + * @param userName the name of user + * @param quotas the quota settings + * @throws IOException + */ + void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final Quotas quotas) throws IOException; + + /** + * Called before the quota for the user on the specified table is stored. + * @param ctx the environment to interact with the framework and master + * @param userName the name of user + * @param tableName the name of the table + * @param quotas the quota settings + * @throws IOException + */ + void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final TableName tableName, final Quotas quotas) throws IOException; + + /** + * Called after the quota for the user on the specified table is stored. + * @param ctx the environment to interact with the framework and master + * @param userName the name of user + * @param tableName the name of the table + * @param quotas the quota settings + * @throws IOException + */ + void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final TableName tableName, final Quotas quotas) throws IOException; + + /** + * Called before the quota for the user on the specified namespace is stored. + * @param ctx the environment to interact with the framework and master + * @param userName the name of user + * @param namespace the name of the namespace + * @param quotas the quota settings + * @throws IOException + */ + void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final String namespace, final Quotas quotas) throws IOException; + + /** + * Called after the quota for the user on the specified namespace is stored. + * @param ctx the environment to interact with the framework and master + * @param userName the name of user + * @param namespace the name of the namespace + * @param quotas the quota settings + * @throws IOException + */ + void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String userName, final String namespace, final Quotas quotas) throws IOException; + + /** + * Called before the quota for the table is stored. + * @param ctx the environment to interact with the framework and master + * @param tableName the name of the table + * @param quotas the quota settings + * @throws IOException + */ + void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final TableName tableName, final Quotas quotas) throws IOException; + + /** + * Called after the quota for the table is stored. + * @param ctx the environment to interact with the framework and master + * @param tableName the name of the table + * @param quotas the quota settings + * @throws IOException + */ + void postSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final TableName tableName, final Quotas quotas) throws IOException; + + /** + * Called before the quota for the namespace is stored. + * @param ctx the environment to interact with the framework and master + * @param namespace the name of the namespace + * @param quotas the quota settings + * @throws IOException + */ + void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String namespace, final Quotas quotas) throws IOException; + + /** + * Called after the quota for the namespace is stored. + * @param ctx the environment to interact with the framework and master + * @param namespace the name of the namespace + * @param quotas the quota settings + * @throws IOException + */ + void postSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, + final String namespace, final Quotas quotas) throws IOException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index ad49036..891a999 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -2477,6 +2477,7 @@ public class RpcServer implements RpcServerInterface { } } + @Override public RpcScheduler getScheduler() { return scheduler; } http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index b133ed6..013d256 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -73,4 +73,6 @@ public interface RpcServerInterface { */ @VisibleForTesting void refreshAuthManager(PolicyProvider pp); + + RpcScheduler getScheduler(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0b6200e..a7006bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -109,6 +109,7 @@ import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; @@ -290,6 +291,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server { SnapshotManager snapshotManager; // monitor for distributed procedures MasterProcedureManagerHost mpmHost; + + private MasterQuotaManager quotaManager; /** flag used in test cases in order to simulate RS failures during master initialization */ private volatile boolean initializationBeforeMetaAssignment = false; @@ -721,6 +724,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server { status.setStatus("Starting namespace manager"); initNamespace(); + + status.setStatus("Starting quota manager"); + initQuotaManager(); if (this.cpHost != null) { try { @@ -761,6 +767,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server { zombieDetector.interrupt(); } + private void initQuotaManager() throws IOException { + quotaManager = new MasterQuotaManager(this); + quotaManager.start(); + } + /** * Create a {@link ServerManager} instance. * @param master @@ -1063,6 +1074,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { // Clean up and close up shop if (this.logCleaner != null) this.logCleaner.cancel(true); if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); + if (this.quotaManager != null) this.quotaManager.stop(); if (this.activeMasterManager != null) this.activeMasterManager.stop(); if (this.serverManager != null) this.serverManager.stop(); if (this.assignmentManager != null) this.assignmentManager.stop(); @@ -1861,6 +1873,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server { public MasterCoprocessorHost getMasterCoprocessorHost() { return cpHost; } + + @Override + public MasterQuotaManager getMasterQuotaManager() { + return quotaManager; + } @Override public ServerName getServerName() { http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index 0fffdab..3c92f72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.coprocessor.*; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; import java.io.IOException; import java.util.List; @@ -930,6 +931,111 @@ public class MasterCoprocessorHost } }); } + + public void preSetUserQuota(final String user, final Quotas quotas) throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.preSetUserQuota(ctx, user, quotas); + } + }); + } + + public void postSetUserQuota(final String user, final Quotas quotas) throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.postSetUserQuota(ctx, user, quotas); + } + }); + } + + public void preSetUserQuota(final String user, final TableName table, final Quotas quotas) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.preSetUserQuota(ctx, user, table, quotas); + } + }); + } + + public void postSetUserQuota(final String user, final TableName table, final Quotas quotas) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.postSetUserQuota(ctx, user, table, quotas); + } + }); + } + + public void preSetUserQuota(final String user, final String namespace, final Quotas quotas) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.preSetUserQuota(ctx, user, namespace, quotas); + } + }); + } + + public void postSetUserQuota(final String user, final String namespace, final Quotas quotas) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.postSetUserQuota(ctx, user, namespace, quotas); + } + }); + } + + public void preSetTableQuota(final TableName table, final Quotas quotas) throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.preSetTableQuota(ctx, table, quotas); + } + }); + } + + public void postSetTableQuota(final TableName table, final Quotas quotas) throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.postSetTableQuota(ctx, table, quotas); + } + }); + } + + public void preSetNamespaceQuota(final String namespace, final Quotas quotas) throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.preSetNamespaceQuota(ctx, namespace, quotas); + } + }); + } + + public void postSetNamespaceQuota(final String namespace, final Quotas quotas) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.postSetNamespaceQuota(ctx, namespace, quotas); + } + }); + } private static abstract class CoprocessorOperation extends ObserverContext<MasterCoprocessorEnvironment> { http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 98f7507..e3e4099 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -129,6 +129,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanReq import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest; @@ -1290,4 +1292,14 @@ public class MasterRpcServices extends RSRpcServices response.setEnabled(master.isBalancerOn()); return response.build(); } + + @Override + public SetQuotaResponse setQuota(RpcController c, SetQuotaRequest req) throws ServiceException { + try { + master.checkInitialized(); + return master.getMasterQuotaManager().setQuota(req); + } catch (Exception e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 458e53c..dbe7b68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import com.google.protobuf.Service; @@ -266,4 +267,10 @@ public interface MasterServices extends Server { * @throws IOException */ public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException; + + /** + * @return Master's instance of {@link MasterQuotaManager} + */ + MasterQuotaManager getMasterQuotaManager(); + } http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java new file mode 100644 index 0000000..654e8fa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; + [email protected] [email protected] +public class DefaultOperationQuota implements OperationQuota { + private static final Log LOG = LogFactory.getLog(DefaultOperationQuota.class); + + private final List<QuotaLimiter> limiters; + private long writeAvailable = 0; + private long readAvailable = 0; + private long writeConsumed = 0; + private long readConsumed = 0; + + private AvgOperationSize avgOpSize = new AvgOperationSize(); + + public DefaultOperationQuota(final QuotaLimiter... limiters) { + this(Arrays.asList(limiters)); + } + + /** + * NOTE: The order matters. It should be something like [user, table, namespace, global] + */ + public DefaultOperationQuota(final List<QuotaLimiter> limiters) { + this.limiters = limiters; + } + + @Override + public void checkQuota(int numWrites, int numReads, int numScans) throws ThrottlingException { + writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); + readConsumed = estimateConsume(OperationType.GET, numReads, 100); + readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); + + writeAvailable = Long.MAX_VALUE; + readAvailable = Long.MAX_VALUE; + for (final QuotaLimiter limiter : limiters) { + if (limiter.isBypass()) continue; + + limiter.checkQuota(writeConsumed, readConsumed); + readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); + writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable()); + } + + for (final QuotaLimiter limiter : limiters) { + limiter.grabQuota(writeConsumed, readConsumed); + } + } + + @Override + public void close() { + // Calculate and set the average size of get, scan and mutate for the current operation + long getSize = avgOpSize.getAvgOperationSize(OperationType.GET); + long scanSize = avgOpSize.getAvgOperationSize(OperationType.SCAN); + long mutationSize = avgOpSize.getAvgOperationSize(OperationType.MUTATE); + for (final QuotaLimiter limiter : limiters) { + limiter.addOperationSize(OperationType.GET, getSize); + limiter.addOperationSize(OperationType.SCAN, scanSize); + limiter.addOperationSize(OperationType.MUTATE, mutationSize); + } + + // Adjust the quota consumed for the specified operation + long writeDiff = avgOpSize.getOperationSize(OperationType.MUTATE) - writeConsumed; + long readDiff = + (avgOpSize.getOperationSize(OperationType.GET) + avgOpSize + .getOperationSize(OperationType.SCAN)) - readConsumed; + for (final QuotaLimiter limiter : limiters) { + if (writeDiff != 0) limiter.consumeWrite(writeDiff); + if (readDiff != 0) limiter.consumeRead(readDiff); + } + } + + @Override + public long getReadAvailable() { + return readAvailable; + } + + @Override + public long getWriteAvailable() { + return writeAvailable; + } + + @Override + public void addGetResult(final Result result) { + avgOpSize.addGetResult(result); + } + + @Override + public void addScanResult(final List<Result> results) { + avgOpSize.addScanResult(results); + } + + @Override + public void addMutation(final Mutation mutation) { + avgOpSize.addMutation(mutation); + } + + @Override + public long getAvgOperationSize(OperationType type) { + return avgOpSize.getAvgOperationSize(type); + } + + private long estimateConsume(final OperationType type, int numReqs, long avgSize) { + if (numReqs > 0) { + for (final QuotaLimiter limiter : limiters) { + long size = limiter.getAvgOperationSize(type); + if (size > 0) { + avgSize = size; + break; + } + } + return avgSize * numReqs; + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java new file mode 100644 index 0000000..af7efb2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java @@ -0,0 +1,441 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.HashSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.handler.CreateTableHandler; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.ThrottleRequest; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota; + +/** + * Master Quota Manager. It is responsible for initialize the quota table on the first-run and + * provide the admin operations to interact with the quota table. TODO: FUTURE: The master will be + * responsible to notify each RS of quota changes and it will do the "quota aggregation" when the + * QuotaScope is CLUSTER. + */ [email protected] [email protected] +public class MasterQuotaManager { + private static final Log LOG = LogFactory.getLog(MasterQuotaManager.class); + + private final MasterServices masterServices; + private NamedLock<String> namespaceLocks; + private NamedLock<TableName> tableLocks; + private NamedLock<String> userLocks; + private boolean enabled = false; + + public MasterQuotaManager(final MasterServices masterServices) { + this.masterServices = masterServices; + } + + public void start() throws IOException { + // If the user doesn't want the quota support skip all the initializations. + if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) { + LOG.info("Quota support disabled"); + return; + } + + // Create the quota table if missing + if (!MetaTableAccessor.tableExists(masterServices.getConnection(), + QuotaUtil.QUOTA_TABLE_NAME)) { + LOG.info("Quota table not found. Creating..."); + createQuotaTable(); + } + + LOG.info("Initializing quota support"); + namespaceLocks = new NamedLock<String>(); + tableLocks = new NamedLock<TableName>(); + userLocks = new NamedLock<String>(); + + enabled = true; + } + + public void stop() { + } + + public boolean isQuotaEnabled() { + return enabled; + } + + /* + * ========================================================================== Admin operations to + * manage the quota table + */ + public SetQuotaResponse setQuota(final SetQuotaRequest req) throws IOException, + InterruptedException { + checkQuotaSupport(); + + if (req.hasUserName()) { + userLocks.lock(req.getUserName()); + try { + if (req.hasTableName()) { + setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req); + } else if (req.hasNamespace()) { + setUserQuota(req.getUserName(), req.getNamespace(), req); + } else { + setUserQuota(req.getUserName(), req); + } + } finally { + userLocks.unlock(req.getUserName()); + } + } else if (req.hasTableName()) { + TableName table = ProtobufUtil.toTableName(req.getTableName()); + tableLocks.lock(table); + try { + setTableQuota(table, req); + } finally { + tableLocks.unlock(table); + } + } else if (req.hasNamespace()) { + namespaceLocks.lock(req.getNamespace()); + try { + setNamespaceQuota(req.getNamespace(), req); + } finally { + namespaceLocks.unlock(req.getNamespace()); + } + } else { + throw new DoNotRetryIOException(new UnsupportedOperationException( + "a user, a table or a namespace must be specified")); + } + return SetQuotaResponse.newBuilder().build(); + } + + public void setUserQuota(final String userName, final SetQuotaRequest req) throws IOException, + InterruptedException { + setQuota(req, new SetQuotaOperations() { + @Override + public Quotas fetch() throws IOException { + return QuotaUtil.getUserQuota(masterServices.getConnection(), userName); + } + + @Override + public void update(final Quotas quotas) throws IOException { + QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotas); + } + + @Override + public void delete() throws IOException { + QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName); + } + + @Override + public void preApply(final Quotas quotas) throws IOException { + masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotas); + } + + @Override + public void postApply(final Quotas quotas) throws IOException { + masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotas); + } + }); + } + + public void setUserQuota(final String userName, final TableName table, final SetQuotaRequest req) + throws IOException, InterruptedException { + setQuota(req, new SetQuotaOperations() { + @Override + public Quotas fetch() throws IOException { + return QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table); + } + + @Override + public void update(final Quotas quotas) throws IOException { + QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table, quotas); + } + + @Override + public void delete() throws IOException { + QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table); + } + + @Override + public void preApply(final Quotas quotas) throws IOException { + masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotas); + } + + @Override + public void postApply(final Quotas quotas) throws IOException { + masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotas); + } + }); + } + + public void + setUserQuota(final String userName, final String namespace, final SetQuotaRequest req) + throws IOException, InterruptedException { + setQuota(req, new SetQuotaOperations() { + @Override + public Quotas fetch() throws IOException { + return QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace); + } + + @Override + public void update(final Quotas quotas) throws IOException { + QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace, quotas); + } + + @Override + public void delete() throws IOException { + QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace); + } + + @Override + public void preApply(final Quotas quotas) throws IOException { + masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, namespace, quotas); + } + + @Override + public void postApply(final Quotas quotas) throws IOException { + masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, namespace, quotas); + } + }); + } + + public void setTableQuota(final TableName table, final SetQuotaRequest req) throws IOException, + InterruptedException { + setQuota(req, new SetQuotaOperations() { + @Override + public Quotas fetch() throws IOException { + return QuotaUtil.getTableQuota(masterServices.getConnection(), table); + } + + @Override + public void update(final Quotas quotas) throws IOException { + QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotas); + } + + @Override + public void delete() throws IOException { + QuotaUtil.deleteTableQuota(masterServices.getConnection(), table); + } + + @Override + public void preApply(final Quotas quotas) throws IOException { + masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotas); + } + + @Override + public void postApply(final Quotas quotas) throws IOException { + masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotas); + } + }); + } + + public void setNamespaceQuota(final String namespace, final SetQuotaRequest req) + throws IOException, InterruptedException { + setQuota(req, new SetQuotaOperations() { + @Override + public Quotas fetch() throws IOException { + return QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace); + } + + @Override + public void update(final Quotas quotas) throws IOException { + QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace, quotas); + } + + @Override + public void delete() throws IOException { + QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace); + } + + @Override + public void preApply(final Quotas quotas) throws IOException { + masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotas); + } + + @Override + public void postApply(final Quotas quotas) throws IOException { + masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotas); + } + }); + } + + private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps) + throws IOException, InterruptedException { + if (req.hasRemoveAll() && req.getRemoveAll() == true) { + quotaOps.preApply(null); + quotaOps.delete(); + quotaOps.postApply(null); + return; + } + + // Apply quota changes + Quotas quotas = quotaOps.fetch(); + quotaOps.preApply(quotas); + + Quotas.Builder builder = (quotas != null) ? quotas.toBuilder() : Quotas.newBuilder(); + if (req.hasThrottle()) applyThrottle(builder, req.getThrottle()); + if (req.hasBypassGlobals()) applyBypassGlobals(builder, req.getBypassGlobals()); + + // Submit new changes + quotas = builder.build(); + if (QuotaUtil.isEmptyQuota(quotas)) { + quotaOps.delete(); + } else { + quotaOps.update(quotas); + } + quotaOps.postApply(quotas); + } + + private static interface SetQuotaOperations { + Quotas fetch() throws IOException; + + void delete() throws IOException; + + void update(final Quotas quotas) throws IOException; + + void preApply(final Quotas quotas) throws IOException; + + void postApply(final Quotas quotas) throws IOException; + } + + /* + * ========================================================================== Helpers to apply + * changes to the quotas + */ + private void applyThrottle(final Quotas.Builder quotas, final ThrottleRequest req) + throws IOException { + Throttle.Builder throttle; + + if (req.hasType() && (req.hasTimedQuota() || quotas.hasThrottle())) { + // Validate timed quota if present + if (req.hasTimedQuota()) { + validateTimedQuota(req.getTimedQuota()); + } + + // apply the new settings + throttle = quotas.hasThrottle() ? quotas.getThrottle().toBuilder() : Throttle.newBuilder(); + + switch (req.getType()) { + case REQUEST_NUMBER: + if (req.hasTimedQuota()) { + throttle.setReqNum(req.getTimedQuota()); + } else { + throttle.clearReqNum(); + } + break; + case REQUEST_SIZE: + if (req.hasTimedQuota()) { + throttle.setReqSize(req.getTimedQuota()); + } else { + throttle.clearReqSize(); + } + break; + case WRITE_NUMBER: + if (req.hasTimedQuota()) { + throttle.setWriteNum(req.getTimedQuota()); + } else { + throttle.clearWriteNum(); + } + break; + case WRITE_SIZE: + if (req.hasTimedQuota()) { + throttle.setWriteSize(req.getTimedQuota()); + } else { + throttle.clearWriteSize(); + } + break; + case READ_NUMBER: + if (req.hasTimedQuota()) { + throttle.setReadNum(req.getTimedQuota()); + } else { + throttle.clearReqNum(); + } + break; + case READ_SIZE: + if (req.hasTimedQuota()) { + throttle.setReadSize(req.getTimedQuota()); + } else { + throttle.clearReadSize(); + } + break; + default: + throw new RuntimeException("Invalid throttle type: " + req.getType()); + } + quotas.setThrottle(throttle.build()); + } else { + quotas.clearThrottle(); + } + } + + private void applyBypassGlobals(final Quotas.Builder quotas, boolean bypassGlobals) { + if (bypassGlobals) { + quotas.setBypassGlobals(bypassGlobals); + } else { + quotas.clearBypassGlobals(); + } + } + + private void validateTimedQuota(final TimedQuota timedQuota) throws IOException { + if (timedQuota.getSoftLimit() < 1) { + throw new DoNotRetryIOException(new UnsupportedOperationException( + "The throttle limit must be greater then 0, got " + timedQuota.getSoftLimit())); + } + } + + /* + * ========================================================================== Helpers + */ + + private void checkQuotaSupport() throws IOException { + if (!enabled) { + throw new DoNotRetryIOException(new UnsupportedOperationException("quota support disabled")); + } + } + + private void createQuotaTable() throws IOException { + HRegionInfo[] newRegions = new HRegionInfo[] { new HRegionInfo(QuotaUtil.QUOTA_TABLE_NAME) }; + + masterServices.getExecutorService() + .submit( + new CreateTableHandler(masterServices, masterServices.getMasterFileSystem(), + QuotaUtil.QUOTA_TABLE_DESC, masterServices.getConfiguration(), newRegions, + masterServices).prepare()); + } + + private static class NamedLock<T> { + private HashSet<T> locks = new HashSet<T>(); + + public void lock(final T name) throws InterruptedException { + synchronized (locks) { + while (locks.contains(name)) { + locks.wait(); + } + locks.add(name); + } + } + + public void unlock(final T name) { + synchronized (locks) { + locks.remove(name); + locks.notifyAll(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java new file mode 100644 index 0000000..2463ef7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; + +/** + * Noop operation quota returned when no quota is associated to the user/table + */ [email protected] [email protected] +final class NoopOperationQuota implements OperationQuota { + private static OperationQuota instance = new NoopOperationQuota(); + + private NoopOperationQuota() { + // no-op + } + + public static OperationQuota get() { + return instance; + } + + @Override + public void checkQuota(int numWrites, int numReads, int numScans) throws ThrottlingException { + // no-op + } + + @Override + public void close() { + // no-op + } + + @Override + public void addGetResult(final Result result) { + // no-op + } + + @Override + public void addScanResult(final List<Result> results) { + // no-op + } + + @Override + public void addMutation(final Mutation mutation) { + // no-op + } + + @Override + public long getReadAvailable() { + return Long.MAX_VALUE; + } + + @Override + public long getWriteAvailable() { + return Long.MAX_VALUE; + } + + @Override + public long getAvgOperationSize(OperationType type) { + return -1; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java new file mode 100644 index 0000000..699fd1a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType; + +/** + * Noop quota limiter returned when no limiter is associated to the user/table + */ [email protected] [email protected] +final class NoopQuotaLimiter implements QuotaLimiter { + private static QuotaLimiter instance = new NoopQuotaLimiter(); + + private NoopQuotaLimiter() { + // no-op + } + + @Override + public void checkQuota(long estimateWriteSize, long estimateReadSize) throws ThrottlingException { + // no-op + } + + @Override + public void grabQuota(long writeSize, long readSize) { + // no-op + } + + @Override + public void consumeWrite(final long size) { + // no-op + } + + @Override + public void consumeRead(final long size) { + // no-op + } + + @Override + public boolean isBypass() { + return true; + } + + @Override + public long getWriteAvailable() { + throw new UnsupportedOperationException(); + } + + @Override + public long getReadAvailable() { + throw new UnsupportedOperationException(); + } + + @Override + public void addOperationSize(OperationType type, long size) { + } + + @Override + public long getAvgOperationSize(OperationType type) { + return -1; + } + + @Override + public String toString() { + return "NoopQuotaLimiter"; + } + + public static QuotaLimiter get() { + return instance; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java new file mode 100644 index 0000000..6010c13 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; + +/** + * Interface that allows to check the quota available for an operation. + */ [email protected] [email protected] +public interface OperationQuota { + public enum OperationType { + MUTATE, GET, SCAN + } + + /** + * Keeps track of the average data size of operations like get, scan, mutate + */ + public class AvgOperationSize { + private final long[] sizeSum; + private final long[] count; + + public AvgOperationSize() { + int size = OperationType.values().length; + sizeSum = new long[size]; + count = new long[size]; + for (int i = 0; i < size; ++i) { + sizeSum[i] = 0; + count[i] = 0; + } + } + + public void addOperationSize(OperationType type, long size) { + if (size > 0) { + int index = type.ordinal(); + sizeSum[index] += size; + count[index]++; + } + } + + public long getAvgOperationSize(OperationType type) { + int index = type.ordinal(); + return count[index] > 0 ? sizeSum[index] / count[index] : 0; + } + + public long getOperationSize(OperationType type) { + return sizeSum[type.ordinal()]; + } + + public void addGetResult(final Result result) { + long size = QuotaUtil.calculateResultSize(result); + addOperationSize(OperationType.GET, size); + } + + public void addScanResult(final List<Result> results) { + long size = QuotaUtil.calculateResultSize(results); + addOperationSize(OperationType.SCAN, size); + } + + public void addMutation(final Mutation mutation) { + long size = QuotaUtil.calculateMutationSize(mutation); + addOperationSize(OperationType.MUTATE, size); + } + } + + /** + * Checks if it is possible to execute the specified operation. The quota will be estimated based + * on the number of operations to perform and the average size accumulated during time. + * @param numWrites number of write operation that will be performed + * @param numReads number of small-read operation that will be performed + * @param numScans number of long-read operation that will be performed + * @throws ThrottlingException if the operation cannot be performed + */ + void checkQuota(int numWrites, int numReads, int numScans) throws ThrottlingException; + + /** Cleanup method on operation completion */ + void close(); + + /** + * Add a get result. This will be used to calculate the exact quota and have a better short-read + * average size for the next time. + */ + void addGetResult(Result result); + + /** + * Add a scan result. This will be used to calculate the exact quota and have a better long-read + * average size for the next time. + */ + void addScanResult(List<Result> results); + + /** + * Add a mutation result. This will be used to calculate the exact quota and have a better + * mutation average size for the next time. + */ + void addMutation(Mutation mutation); + + /** @return the number of bytes available to read to avoid exceeding the quota */ + long getReadAvailable(); + + /** @return the number of bytes available to write to avoid exceeding the quota */ + long getWriteAvailable(); + + /** @return the average data size of the specified operation */ + long getAvgOperationSize(OperationType type); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java new file mode 100644 index 0000000..39f1456 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Cache that keeps track of the quota settings for the users and tables that are interacting with + * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will + * be returned and the request to fetch the quota information will be enqueued for the next refresh. + * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss + * events. Later the Quotas will be pushed using the notification system. + */ [email protected] [email protected] +public class QuotaCache implements Stoppable { + private static final Log LOG = LogFactory.getLog(QuotaCache.class); + + public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; + private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min + private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD + + // for testing purpose only, enforce the cache to be always refreshed + private static boolean TEST_FORCE_REFRESH = false; + + private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = + new ConcurrentHashMap<String, QuotaState>(); + private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = + new ConcurrentHashMap<TableName, QuotaState>(); + private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = + new ConcurrentHashMap<String, UserQuotaState>(); + private final RegionServerServices rsServices; + + private QuotaRefresherChore refreshChore; + private boolean stopped = true; + + public QuotaCache(final RegionServerServices rsServices) { + this.rsServices = rsServices; + } + + public void start() throws IOException { + stopped = false; + + // TODO: This will be replaced once we have the notification bus ready. + Configuration conf = rsServices.getConfiguration(); + int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); + refreshChore = new QuotaRefresherChore(period, this); + rsServices.getChoreService().scheduleChore(refreshChore); + } + + @Override + public void stop(final String why) { + stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } + + /** + * Returns the limiter associated to the specified user/table. + * @param ugi the user to limit + * @param table the table to limit + * @return the limiter associated to the specified user/table + */ + public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) { + if (table.isSystemTable()) { + return NoopQuotaLimiter.get(); + } + return getUserQuotaState(ugi).getTableLimiter(table); + } + + /** + * Returns the QuotaState associated to the specified user. + * @param ugi the user + * @return the quota info associated to specified user + */ + public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { + String key = ugi.getShortUserName(); + UserQuotaState quotaInfo = userQuotaCache.get(key); + if (quotaInfo == null) { + quotaInfo = new UserQuotaState(); + if (userQuotaCache.putIfAbsent(key, quotaInfo) == null) { + triggerCacheRefresh(); + } + } + return quotaInfo; + } + + /** + * Returns the limiter associated to the specified table. + * @param table the table to limit + * @return the limiter associated to the specified table + */ + public QuotaLimiter getTableLimiter(final TableName table) { + return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter(); + } + + /** + * Returns the limiter associated to the specified namespace. + * @param namespace the namespace to limit + * @return the limiter associated to the specified namespace + */ + public QuotaLimiter getNamespaceLimiter(final String namespace) { + return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter(); + } + + /** + * Returns the QuotaState requested. If the quota info is not in cache an empty one will be + * returned and the quota request will be enqueued for the next cache refresh. + */ + private <K> QuotaState + getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap, final K key) { + QuotaState quotaInfo = quotasMap.get(key); + if (quotaInfo == null) { + quotaInfo = new QuotaState(); + if (quotasMap.putIfAbsent(key, quotaInfo) == null) { + triggerCacheRefresh(); + } + } + return quotaInfo; + } + + @VisibleForTesting + void triggerCacheRefresh() { + refreshChore.triggerNow(); + } + + @VisibleForTesting + long getLastUpdate() { + return refreshChore.lastUpdate; + } + + @VisibleForTesting + Map<String, QuotaState> getNamespaceQuotaCache() { + return namespaceQuotaCache; + } + + @VisibleForTesting + Map<TableName, QuotaState> getTableQuotaCache() { + return tableQuotaCache; + } + + @VisibleForTesting + Map<String, UserQuotaState> getUserQuotaCache() { + return userQuotaCache; + } + + public static boolean isTEST_FORCE_REFRESH() { + return TEST_FORCE_REFRESH; + } + + public static void setTEST_FORCE_REFRESH(boolean tEST_FORCE_REFRESH) { + TEST_FORCE_REFRESH = tEST_FORCE_REFRESH; + } + + // TODO: Remove this once we have the notification bus + private class QuotaRefresherChore extends ScheduledChore { + private long lastUpdate = 0; + + public QuotaRefresherChore(final int period, final Stoppable stoppable) { + super("QuotaRefresherChore", stoppable, period); + } + + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES", + justification = "I do not understand why the complaints, it looks good to me -- FIX") + protected void chore() { + // Prefetch online tables/namespaces + for (TableName table : QuotaCache.this.rsServices.getOnlineTables()) { + if (table.isSystemTable()) continue; + if (!QuotaCache.this.tableQuotaCache.contains(table)) { + QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState()); + } + String ns = table.getNamespaceAsString(); + if (!QuotaCache.this.namespaceQuotaCache.contains(ns)) { + QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState()); + } + } + + fetchNamespaceQuotaState(); + fetchTableQuotaState(); + fetchUserQuotaState(); + lastUpdate = EnvironmentEdgeManager.currentTime(); + } + + private void fetchNamespaceQuotaState() { + fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() { + @Override + public Get makeGet(final Map.Entry<String, QuotaState> entry) { + return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey()); + } + + @Override + public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { + return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets); + } + }); + } + + private void fetchTableQuotaState() { + fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() { + @Override + public Get makeGet(final Map.Entry<TableName, QuotaState> entry) { + return QuotaUtil.makeGetForTableQuotas(entry.getKey()); + } + + @Override + public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException { + return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets); + } + }); + } + + private void fetchUserQuotaState() { + final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet(); + final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet(); + fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() { + @Override + public Get makeGet(final Map.Entry<String, UserQuotaState> entry) { + return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces); + } + + @Override + public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException { + return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets); + } + }); + } + + private <K, V extends QuotaState> void fetch(final String type, + final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) { + long now = EnvironmentEdgeManager.currentTime(); + long refreshPeriod = getPeriod(); + long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR; + + // Find the quota entries to update + List<Get> gets = new ArrayList<Get>(); + List<K> toRemove = new ArrayList<K>(); + for (Map.Entry<K, V> entry : quotasMap.entrySet()) { + long lastUpdate = entry.getValue().getLastUpdate(); + long lastQuery = entry.getValue().getLastQuery(); + if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) { + toRemove.add(entry.getKey()); + } else if (isTEST_FORCE_REFRESH() || (now - lastUpdate) >= refreshPeriod) { + gets.add(fetcher.makeGet(entry)); + } + } + + for (final K key : toRemove) { + if (LOG.isTraceEnabled()) { + LOG.trace("evict " + type + " key=" + key); + } + quotasMap.remove(key); + } + + // fetch and update the quota entries + if (!gets.isEmpty()) { + try { + for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) { + V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); + if (quotaInfo != null) { + quotaInfo.update(entry.getValue()); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo); + } + } + } catch (IOException e) { + LOG.warn("Unable to read " + type + " from quota table", e); + } + } + } + } + + static interface Fetcher<Key, Value> { + Get makeGet(Map.Entry<Key, Value> entry); + + Map<Key, Value> fetchEntries(List<Get> gets) throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java new file mode 100644 index 0000000..ffacbc0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType; + +/** + * Internal interface used to interact with the user/table quota. + */ [email protected] [email protected] +public interface QuotaLimiter { + /** + * Checks if it is possible to execute the specified operation. + * + * @param estimateWriteSize the write size that will be checked against the available quota + * @param estimateReadSize the read size that will be checked against the available quota + * @throws ThrottlingException thrown if not enough avialable resources to perform operation. + */ + void checkQuota(long estimateWriteSize, long estimateReadSize) + throws ThrottlingException; + + /** + * Removes the specified write and read amount from the quota. + * At this point the write and read amount will be an estimate, + * that will be later adjusted with a consumeWrite()/consumeRead() call. + * + * @param writeSize the write size that will be removed from the current quota + * @param readSize the read size that will be removed from the current quota + */ + void grabQuota(long writeSize, long readSize); + + /** + * Removes or add back some write amount to the quota. + * (called at the end of an operation in case the estimate quota was off) + */ + void consumeWrite(long size); + + /** + * Removes or add back some read amount to the quota. + * (called at the end of an operation in case the estimate quota was off) + */ + void consumeRead(long size); + + /** @return true if the limiter is a noop */ + boolean isBypass(); + + /** @return the number of bytes available to read to avoid exceeding the quota */ + long getReadAvailable(); + + /** @return the number of bytes available to write to avoid exceeding the quota */ + long getWriteAvailable(); + + /** + * Add the average size of the specified operation type. + * The average will be used as estimate for the next operations. + */ + void addOperationSize(OperationType type, long size); + + /** @return the average data size of the specified operation */ + long getAvgOperationSize(OperationType type); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiterFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiterFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiterFactory.java new file mode 100644 index 0000000..e9bb304 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiterFactory.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle; + [email protected] [email protected] +public class QuotaLimiterFactory { + + private QuotaLimiterFactory() { + // utility class + } + + public static QuotaLimiter fromThrottle(final Throttle throttle) { + return TimeBasedLimiter.fromThrottle(throttle); + } + + public static QuotaLimiter update(final QuotaLimiter a, final QuotaLimiter b) { + if (a.getClass().equals(b.getClass()) && a instanceof TimeBasedLimiter) { + ((TimeBasedLimiter) a).update(((TimeBasedLimiter) b)); + return a; + } + throw new UnsupportedOperationException("TODO not implemented yet"); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java new file mode 100644 index 0000000..c015b24 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * In-Memory state of table or namespace quotas + */ [email protected] [email protected] +public class QuotaState { + private long lastUpdate = 0; + private long lastQuery = 0; + private QuotaLimiter globalLimiter = NoopQuotaLimiter.get(); + + public QuotaState() { + this(0); + } + + public QuotaState(final long updateTs) { + lastUpdate = updateTs; + } + + public synchronized long getLastUpdate() { + return lastUpdate; + } + + public synchronized long getLastQuery() { + return lastQuery; + } + + @Override + public synchronized String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("QuotaState(ts=" + getLastUpdate()); + if (isBypass()) { + builder.append(" bypass"); + } else { + if (globalLimiter != NoopQuotaLimiter.get()) { + // builder.append(" global-limiter"); + builder.append(" " + globalLimiter); + } + } + builder.append(')'); + return builder.toString(); + } + + /** + * @return true if there is no quota information associated to this object + */ + public synchronized boolean isBypass() { + return globalLimiter == NoopQuotaLimiter.get(); + } + + /** + * Setup the global quota information. (This operation is part of the QuotaState setup) + */ + public synchronized void setQuotas(final Quotas quotas) { + if (quotas.hasThrottle()) { + globalLimiter = QuotaLimiterFactory.fromThrottle(quotas.getThrottle()); + } else { + globalLimiter = NoopQuotaLimiter.get(); + } + } + + /** + * Perform an update of the quota info based on the other quota info object. (This operation is + * executed by the QuotaCache) + */ + public synchronized void update(final QuotaState other) { + if (globalLimiter == NoopQuotaLimiter.get()) { + globalLimiter = other.globalLimiter; + } else if (other.globalLimiter == NoopQuotaLimiter.get()) { + globalLimiter = NoopQuotaLimiter.get(); + } else { + globalLimiter = QuotaLimiterFactory.update(globalLimiter, other.globalLimiter); + } + lastUpdate = other.lastUpdate; + } + + /** + * Return the limiter associated with this quota. + * @return the quota limiter + */ + public synchronized QuotaLimiter getGlobalLimiter() { + setLastQuery(EnvironmentEdgeManager.currentTime()); + return globalLimiter; + } + + /** + * Return the limiter associated with this quota without updating internal last query stats + * @return the quota limiter + */ + synchronized QuotaLimiter getGlobalLimiterWithoutUpdatingLastQuery() { + return globalLimiter; + } + + public synchronized void setLastQuery(long lastQuery) { + this.lastQuery = lastQuery; + } +} \ No newline at end of file
