Repository: hbase
Updated Branches:
  refs/heads/master ecec35ae4 -> ac1a7a4a7


HBASE-15470 Add a setting for Priority queue length

Summary:
Move the config keys to one place
Make Two different config keys. One for default, one for priority

Test Plan: unit tests

Differential Revision: https://reviews.facebook.net/D55575


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ac1a7a4a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ac1a7a4a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ac1a7a4a

Branch: refs/heads/master
Commit: ac1a7a4a78966826d755a21faa007f9ae03f2cd0
Parents: ecec35a
Author: Elliott Clark <ecl...@apache.org>
Authored: Wed Mar 16 10:13:46 2016 -0700
Committer: Elliott Clark <ecl...@apache.org>
Committed: Wed Mar 16 16:52:04 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java  | 3 +--
 .../main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java  | 7 ++++++-
 .../main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java | 5 +++++
 .../org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java     | 9 +++++++--
 4 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ac1a7a4a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index b069a5a..ee36f3f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger;
  * This can be used for HMaster, where no prioritization is needed.
  */
 public class FifoRpcScheduler extends RpcScheduler {
-
   private final int handlerCount;
   private final int maxQueueLength;
   private final AtomicInteger queueSize = new AtomicInteger(0);
@@ -40,7 +39,7 @@ public class FifoRpcScheduler extends RpcScheduler {
 
   public FifoRpcScheduler(Configuration conf, int handlerCount) {
     this.handlerCount = handlerCount;
-    this.maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
+    this.maxQueueLength = 
conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
         handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ac1a7a4a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 22cb195..40c11aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -43,6 +43,7 @@ public abstract class RpcExecutor {
   private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
 
   protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
+
   protected volatile int currentQueueLimit;
 
   private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
@@ -219,6 +220,10 @@ public abstract class RpcExecutor {
    * @param conf updated configuration
    */
   public void resizeQueues(Configuration conf) {
-    currentQueueLimit = conf.getInt("hbase.ipc.server.max.callqueue.length", 
currentQueueLimit);
+    String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH;
+    if (name != null && name.toLowerCase().contains("priority")) {
+      configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH;
+    }
+    currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ac1a7a4a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
index 50886cb..2414e3d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
@@ -31,6 +31,11 @@ import java.net.InetSocketAddress;
 @InterfaceStability.Evolving
 public abstract class RpcScheduler {
 
+  public static final String IPC_SERVER_MAX_CALLQUEUE_LENGTH =
+      "hbase.ipc.server.max.callqueue.length";
+  public static final String IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH =
+      "hbase.ipc.server.priority.max.callqueue.length";
+
   /** Exposes runtime information of a {@code RpcServer} that a {@code 
RpcScheduler} may need. */
   static abstract class Context {
     public abstract InetSocketAddress getListenerAddress();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ac1a7a4a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 12ee540..0cd34bb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -166,8 +166,12 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
       PriorityFunction priority,
       Abortable server,
       int highPriorityLevel) {
-    int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
+
+    int maxQueueLength = 
conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
         handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+    int maxPriorityQueueLength =
+        conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, 
maxQueueLength);
+
     this.priority = priority;
     this.highPriorityLevel = highPriorityLevel;
     this.abortable = server;
@@ -226,7 +230,8 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
 
     // Create 2 queues to help priorityExecutor be more scalable.
     this.priorityExecutor = priorityHandlerCount > 0 ?
-        new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, 
maxQueueLength) : null;
+        new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, 
maxPriorityQueueLength) :
+        null;
 
    this.replicationExecutor =
      replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",

Reply via email to