Repository: hbase Updated Branches: refs/heads/master d8d4ba7c5 -> b3dcfb659
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java index 9fbf4db..140bdbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java @@ -171,4 +171,20 @@ public interface RegionServerObserver extends Coprocessor { default void postReplicateLogEntries( final ObserverContext<RegionServerCoprocessorEnvironment> ctx, List<WALEntry> entries, CellScanner cells) throws IOException {} + + /** + * This will be called before clearing compaction queues + * @param ctx the environment to interact with the framework and region server. + */ + default void preClearCompactionQueues( + final ObserverContext<RegionServerCoprocessorEnvironment> ctx) + throws IOException {} + + /** + * This will be called after clearing compaction queues + * @param ctx the environment to interact with the framework and region server. + */ + default void postClearCompactionQueues( + final ObserverContext<RegionServerCoprocessorEnvironment> ctx) + throws IOException {} } http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 5356ee1..a74c4cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -672,4 +672,12 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi void shutdownLongCompactions(){ this.longCompactions.shutdown(); } + + public void clearLongCompactionsQueue() { + longCompactions.getQueue().clear(); + } + + public void clearShortCompactionsQueue() { + shortCompactions.getQueue().clear(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 9f1ef0b..95408b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -31,6 +31,7 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@ -110,6 +111,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; @@ -268,6 +271,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private final long minimumScanTimeLimitDelta; + final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); + /** * An Rpc callback for closing a RegionScanner. */ @@ -1611,6 +1616,44 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return builder.build(); } + @Override + @QosPriority(priority=HConstants.ADMIN_QOS) + public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, + ClearCompactionQueuesRequest request) throws ServiceException { + LOG.debug("Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress() + + " clear compactions queue"); + ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); + requestCount.increment(); + if (clearCompactionQueues.compareAndSet(false,true)) { + try { + checkOpen(); + regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues(); + for (String queueName : request.getQueueNameList()) { + LOG.debug("clear " + queueName + " compaction queue"); + switch (queueName) { + case "long": + regionServer.compactSplitThread.clearLongCompactionsQueue(); + break; + case "short": + regionServer.compactSplitThread.clearShortCompactionsQueue(); + break; + default: + LOG.warn("Unknown queue name " + queueName); + throw new IOException("Unknown queue name " + queueName); + } + } + regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues(); + } catch (IOException ie) { + throw new ServiceException(ie); + } finally { + clearCompactionQueues.set(false); + } + } else { + LOG.warn("Clear compactions queue is executing by other admin."); + } + return respBuilder.build(); + } + /** * Get some information of the region server. * http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index 7732827..9d68d1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -218,6 +218,26 @@ public class RegionServerCoprocessorHost extends }); } + public void preClearCompactionQueues() throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { + oserver.preClearCompactionQueues(ctx); + } + }); + } + + public void postClearCompactionQueues() throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { + oserver.postClearCompactionQueues(ctx); + } + }); + } + private <T> T execOperationWithResult(final T defaultValue, final CoprocessOperationWithResult<T> ctx) throws IOException { if (ctx == null) http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index a38d705..8a6eb96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -2668,6 +2668,12 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS } @Override + public void preClearCompactionQueues(ObserverContext<RegionServerCoprocessorEnvironment> ctx) + throws IOException { + requirePermission(getActiveUser(ctx), "preClearCompactionQueues", Permission.Action.ADMIN); + } + + @Override public void preMoveServersAndTables(ObserverContext<MasterCoprocessorEnvironment> ctx, Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException { requirePermission(getActiveUser(ctx), "moveServersAndTables", Action.ADMIN); http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 78c8214..b8309c7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest; @@ -452,6 +454,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { } @Override + public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, + ClearCompactionQueuesRequest request) throws ServiceException { + return null; + } + + @Override public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request) throws ServiceException { // TODO Auto-generated method stub http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-shell/src/main/ruby/hbase/admin.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index b16c868..8fba82e 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -1215,5 +1215,31 @@ module Hbase set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] end + + #---------------------------------------------------------------------------------------------- + # clear compaction queues + def clear_compaction_queues(server_name, queue_name = nil) + names = ['long', 'short'] + queues = java.util.HashSet.new + if queue_name.nil? + queues.add('long') + queues.add('short') + elsif queue_name.kind_of?(String) + queues.add(queue_name) + if !(names.include?(queue_name)) + raise(ArgumentError, "Unknown queue name #{queue_name}") + end + elsif queue_name.kind_of?(Array) + queue_name.each do |s| + queues.add(s) + if !(names.include?(s)) + raise(ArgumentError, "Unknown queue name #{s}") + end + end + else + raise(ArgumentError, "Unknown queue name #{queue_name}") + end + @admin.clearCompactionQueues(ServerName.valueOf(server_name), queues) + end end end http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-shell/src/main/ruby/shell.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index a6aba76..f395af5 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -357,6 +357,7 @@ Shell.load_command_group( trace splitormerge_switch splitormerge_enabled + clear_compaction_queues ], # TODO remove older hlog_roll command :aliases => { http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-shell/src/main/ruby/shell/commands/clear_compaction_queues.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/clear_compaction_queues.rb b/hbase-shell/src/main/ruby/shell/commands/clear_compaction_queues.rb new file mode 100644 index 0000000..21668d3 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/clear_compaction_queues.rb @@ -0,0 +1,41 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class ClearCompactionQueues < Command + def help + return <<-EOF + Clear compacting queues on a regionserver. + The queue_name contains short and long. + short is shortCompactions's queue,long is longCompactions's queue. + + Examples: + hbase> clear_compaction_queues 'host187.example.com,60020' + hbase> clear_compaction_queues 'host187.example.com,60020','long' + hbase> clear_compaction_queues 'host187.example.com,60020', ['long','short'] + EOF + end + + def command(server_name, queue_name = nil) + admin.clear_compaction_queues(server_name, queue_name) + end + end + end +end \ No newline at end of file