Repository: hbase
Updated Branches:
  refs/heads/master d8d4ba7c5 -> b3dcfb659


http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
index 9fbf4db..140bdbe 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
@@ -171,4 +171,20 @@ public interface RegionServerObserver extends Coprocessor {
   default void postReplicateLogEntries(
       final ObserverContext<RegionServerCoprocessorEnvironment> ctx,
       List<WALEntry> entries, CellScanner cells) throws IOException {}
+
+  /**
+   * This will be called before clearing compaction queues
+   * @param ctx the environment to interact with the framework and region 
server.
+   */
+  default void preClearCompactionQueues(
+      final ObserverContext<RegionServerCoprocessorEnvironment> ctx)
+      throws IOException {}
+
+  /**
+   * This will be called after clearing compaction queues
+   * @param ctx the environment to interact with the framework and region 
server.
+   */
+  default void postClearCompactionQueues(
+      final ObserverContext<RegionServerCoprocessorEnvironment> ctx)
+      throws IOException {}
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
index 5356ee1..a74c4cb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
@@ -672,4 +672,12 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
   void shutdownLongCompactions(){
     this.longCompactions.shutdown();
   }
+
+  public void clearLongCompactionsQueue() {
+    longCompactions.getQueue().clear();
+  }
+
+  public void clearShortCompactionsQueue() {
+    shortCompactions.getQueue().clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9f1ef0b..95408b7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -31,6 +31,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 
@@ -110,6 +111,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
@@ -268,6 +271,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    */
   private final long minimumScanTimeLimitDelta;
 
+  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
+
   /**
    * An Rpc callback for closing a RegionScanner.
    */
@@ -1611,6 +1616,44 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
     return builder.build();
   }
 
+  @Override
+  @QosPriority(priority=HConstants.ADMIN_QOS)
+  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController 
controller,
+    ClearCompactionQueuesRequest request) throws ServiceException {
+    LOG.debug("Client=" + RpcServer.getRequestUserName() + "/" + 
RpcServer.getRemoteAddress()
+            + " clear compactions queue");
+    ClearCompactionQueuesResponse.Builder respBuilder = 
ClearCompactionQueuesResponse.newBuilder();
+    requestCount.increment();
+    if (clearCompactionQueues.compareAndSet(false,true)) {
+      try {
+        checkOpen();
+        
regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
+        for (String queueName : request.getQueueNameList()) {
+          LOG.debug("clear " + queueName + " compaction queue");
+          switch (queueName) {
+            case "long":
+              regionServer.compactSplitThread.clearLongCompactionsQueue();
+              break;
+            case "short":
+              regionServer.compactSplitThread.clearShortCompactionsQueue();
+              break;
+            default:
+              LOG.warn("Unknown queue name " + queueName);
+              throw new IOException("Unknown queue name " + queueName);
+          }
+        }
+        
regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues();
+      } catch (IOException ie) {
+        throw new ServiceException(ie);
+      } finally {
+        clearCompactionQueues.set(false);
+      }
+    } else {
+      LOG.warn("Clear compactions queue is executing by other admin.");
+    }
+    return respBuilder.build();
+  }
+
   /**
    * Get some information of the region server.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
index 7732827..9d68d1b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
@@ -218,6 +218,26 @@ public class RegionServerCoprocessorHost extends
         });
   }
 
+  public void preClearCompactionQueues() throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(RegionServerObserver oserver,
+                       ObserverContext<RegionServerCoprocessorEnvironment> 
ctx) throws IOException {
+        oserver.preClearCompactionQueues(ctx);
+      }
+    });
+  }
+
+  public void postClearCompactionQueues() throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(RegionServerObserver oserver,
+                       ObserverContext<RegionServerCoprocessorEnvironment> 
ctx) throws IOException {
+        oserver.postClearCompactionQueues(ctx);
+      }
+    });
+  }
+
   private <T> T execOperationWithResult(final T defaultValue,
       final CoprocessOperationWithResult<T> ctx) throws IOException {
     if (ctx == null)

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index a38d705..8a6eb96 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -2668,6 +2668,12 @@ public class AccessController implements MasterObserver, 
RegionObserver, RegionS
   }
 
   @Override
+  public void  
preClearCompactionQueues(ObserverContext<RegionServerCoprocessorEnvironment> 
ctx)
+          throws IOException {
+    requirePermission(getActiveUser(ctx), "preClearCompactionQueues", 
Permission.Action.ADMIN);
+  }
+
+  @Override
   public void 
preMoveServersAndTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
       Set<Address> servers, Set<TableName> tables, String targetGroup) throws 
IOException {
     requirePermission(getActiveUser(ctx), "moveServersAndTables", 
Action.ADMIN);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 78c8214..b8309c7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest;
@@ -452,6 +454,12 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
   }
 
   @Override
+  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController 
controller,
+    ClearCompactionQueuesRequest request) throws ServiceException {
+    return null;
+  }
+
+  @Override
   public GetStoreFileResponse getStoreFile(RpcController controller,
       GetStoreFileRequest request) throws ServiceException {
     // TODO Auto-generated method stub

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-shell/src/main/ruby/hbase/admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb 
b/hbase-shell/src/main/ruby/hbase/admin.rb
index b16c868..8fba82e 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -1215,5 +1215,31 @@ module Hbase
       set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
       set_descriptor_config(htd, arg.delete(CONFIGURATION)) if 
arg[CONFIGURATION]
     end
+
+    
#----------------------------------------------------------------------------------------------
+    # clear compaction queues
+    def clear_compaction_queues(server_name, queue_name = nil)
+      names = ['long', 'short']
+      queues = java.util.HashSet.new
+      if queue_name.nil?
+        queues.add('long')
+        queues.add('short')
+      elsif queue_name.kind_of?(String)
+        queues.add(queue_name)
+        if !(names.include?(queue_name))
+          raise(ArgumentError, "Unknown queue name #{queue_name}")
+        end
+      elsif queue_name.kind_of?(Array)
+        queue_name.each do |s|
+          queues.add(s)
+          if !(names.include?(s))
+            raise(ArgumentError, "Unknown queue name #{s}")
+          end
+        end
+      else
+        raise(ArgumentError, "Unknown queue name #{queue_name}")
+      end
+      @admin.clearCompactionQueues(ServerName.valueOf(server_name), queues)
+    end
   end
 end

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-shell/src/main/ruby/shell.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell.rb 
b/hbase-shell/src/main/ruby/shell.rb
index a6aba76..f395af5 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -357,6 +357,7 @@ Shell.load_command_group(
     trace
     splitormerge_switch
     splitormerge_enabled
+    clear_compaction_queues
   ],
   # TODO remove older hlog_roll command
   :aliases => {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3dcfb65/hbase-shell/src/main/ruby/shell/commands/clear_compaction_queues.rb
----------------------------------------------------------------------
diff --git 
a/hbase-shell/src/main/ruby/shell/commands/clear_compaction_queues.rb 
b/hbase-shell/src/main/ruby/shell/commands/clear_compaction_queues.rb
new file mode 100644
index 0000000..21668d3
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/clear_compaction_queues.rb
@@ -0,0 +1,41 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class ClearCompactionQueues < Command
+      def help
+        return <<-EOF
+          Clear compacting queues on a regionserver.
+          The queue_name contains short and long.
+          short is shortCompactions's queue,long is longCompactions's queue.
+
+          Examples:
+          hbase> clear_compaction_queues 'host187.example.com,60020'
+          hbase> clear_compaction_queues 'host187.example.com,60020','long'
+          hbase> clear_compaction_queues 'host187.example.com,60020', 
['long','short']
+        EOF
+      end
+
+      def command(server_name, queue_name = nil)
+        admin.clear_compaction_queues(server_name, queue_name)
+      end
+    end
+  end
+end
\ No newline at end of file

Reply via email to