HBASE-20499 Replication/Priority executors can use specific max queue length as 
default value instead of general maxQueueLength

Signed-off-by: tedyu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6d080762
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6d080762
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6d080762

Branch: refs/heads/HBASE-19064
Commit: 6d080762ef795adf02dd0ab236c4b3eb73e19a91
Parents: a136303
Author: Nihal Jain <[email protected]>
Authored: Fri Apr 27 14:13:57 2018 +0530
Committer: tedyu <[email protected]>
Committed: Mon Apr 30 07:42:32 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/RpcExecutor.java    |  8 ++++++--
 .../org/apache/hadoop/hbase/ipc/RpcScheduler.java   |  2 ++
 .../apache/hadoop/hbase/ipc/SimpleRpcScheduler.java | 16 +++++++++++-----
 3 files changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6d080762/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 7470758..f63b243 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -486,8 +486,12 @@ public abstract class RpcExecutor {
    */
   public void resizeQueues(Configuration conf) {
     String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH;
-    if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) {
-      configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH;
+    if (name != null) {
+      if (name.toLowerCase(Locale.ROOT).contains("priority")) {
+        configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH;
+      } else if (name.toLowerCase(Locale.ROOT).contains("replication")) {
+        configKey = RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH;
+      }
     }
     currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d080762/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
index be54e54..e1fe397 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
@@ -35,6 +35,8 @@ public abstract class RpcScheduler {
       "hbase.ipc.server.max.callqueue.length";
   public static final String IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH =
       "hbase.ipc.server.priority.max.callqueue.length";
+  public static final String IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH =
+      "hbase.ipc.server.replication.max.callqueue.length";
 
   /** Exposes runtime information of a {@code RpcServer} that a {@code 
RpcScheduler} may need. */
   public static abstract class Context {

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d080762/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 47c1498..725a93a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -65,8 +65,11 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
 
     int maxQueueLength = 
conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
         handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
-    int maxPriorityQueueLength =
-        conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, 
maxQueueLength);
+    int maxPriorityQueueLength = 
conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
+      priorityHandlerCount * 
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+    int maxReplicationQueueLength =
+        conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
+          replicationHandlerCount * 
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
 
     this.priority = priority;
     this.highPriorityLevel = highPriorityLevel;
@@ -94,9 +97,12 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
     this.priorityExecutor = priorityHandlerCount > 0 ? new 
FastPathBalancedQueueRpcExecutor(
         "priority.FPBQ", priorityHandlerCount, 
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
         maxPriorityQueueLength, priority, conf, abortable) : null;
-    this.replicationExecutor = replicationHandlerCount > 0 ? new 
FastPathBalancedQueueRpcExecutor(
-        "replication.FPBQ", replicationHandlerCount, 
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
-        maxQueueLength, priority, conf, abortable) : null;
+    this.replicationExecutor =
+        replicationHandlerCount > 0
+            ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", 
replicationHandlerCount,
+                RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, 
maxReplicationQueueLength, priority,
+                conf, abortable)
+            : null;
   }
 
 

Reply via email to