Repository: hbase Updated Branches: refs/heads/master ecec35ae4 -> ac1a7a4a7
HBASE-15470 Add a setting for Priority queue length Summary: Move the config keys to one place Make Two different config keys. One for default, one for priority Test Plan: unit tests Differential Revision: https://reviews.facebook.net/D55575 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ac1a7a4a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ac1a7a4a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ac1a7a4a Branch: refs/heads/master Commit: ac1a7a4a78966826d755a21faa007f9ae03f2cd0 Parents: ecec35a Author: Elliott Clark <ecl...@apache.org> Authored: Wed Mar 16 10:13:46 2016 -0700 Committer: Elliott Clark <ecl...@apache.org> Committed: Wed Mar 16 16:52:04 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java | 3 +-- .../main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java | 7 ++++++- .../main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java | 5 +++++ .../org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java | 9 +++++++-- 4 files changed, 19 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ac1a7a4a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index b069a5a..ee36f3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger; * This can be used for HMaster, where no prioritization is needed. */ public class FifoRpcScheduler extends RpcScheduler { - private final int handlerCount; private final int maxQueueLength; private final AtomicInteger queueSize = new AtomicInteger(0); @@ -40,7 +39,7 @@ public class FifoRpcScheduler extends RpcScheduler { public FifoRpcScheduler(Configuration conf, int handlerCount) { this.handlerCount = handlerCount; - this.maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length", + this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ac1a7a4a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 22cb195..40c11aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -43,6 +43,7 @@ public abstract class RpcExecutor { private static final Log LOG = LogFactory.getLog(RpcExecutor.class); protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; + protected volatile int currentQueueLimit; private final AtomicInteger activeHandlerCount = new AtomicInteger(0); @@ -219,6 +220,10 @@ public abstract class RpcExecutor { * @param conf updated configuration */ public void resizeQueues(Configuration conf) { - currentQueueLimit = conf.getInt("hbase.ipc.server.max.callqueue.length", currentQueueLimit); + String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; + if (name != null && name.toLowerCase().contains("priority")) { + configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; + } + currentQueueLimit = conf.getInt(configKey, currentQueueLimit); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ac1a7a4a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index 50886cb..2414e3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -31,6 +31,11 @@ import java.net.InetSocketAddress; @InterfaceStability.Evolving public abstract class RpcScheduler { + public static final String IPC_SERVER_MAX_CALLQUEUE_LENGTH = + "hbase.ipc.server.max.callqueue.length"; + public static final String IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH = + "hbase.ipc.server.priority.max.callqueue.length"; + /** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */ static abstract class Context { public abstract InetSocketAddress getListenerAddress(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ac1a7a4a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 12ee540..0cd34bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -166,8 +166,12 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs PriorityFunction priority, Abortable server, int highPriorityLevel) { - int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length", + + int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + int maxPriorityQueueLength = + conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, maxQueueLength); + this.priority = priority; this.highPriorityLevel = highPriorityLevel; this.abortable = server; @@ -226,7 +230,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs // Create 2 queues to help priorityExecutor be more scalable. this.priorityExecutor = priorityHandlerCount > 0 ? - new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxQueueLength) : null; + new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxPriorityQueueLength) : + null; this.replicationExecutor = replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",