This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 43dbd893c74 HBASE-29141 Calculate default maxQueueLength call queues 
correctly (#7490)
43dbd893c74 is described below

commit 43dbd893c74044d7f0ee9beddbce1239c0830231
Author: Umesh <[email protected]>
AuthorDate: Tue Jan 27 03:37:54 2026 +0530

    HBASE-29141 Calculate default maxQueueLength call queues correctly (#7490)
    
    Co-authored-by: ukumawat <[email protected]>
    Signed-off-by: Andrew Purtell <[email protected]>
---
 .../hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java   |   4 +-
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java       |   4 +-
 .../org/apache/hadoop/hbase/ipc/RpcExecutor.java   |  24 ++++-
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java       |   8 +-
 .../hadoop/hbase/ipc/TestRWQueueRpcExecutor.java   |  13 ++-
 .../apache/hadoop/hbase/ipc/TestRpcExecutor.java   | 111 +++++++++++++++++++++
 6 files changed, 150 insertions(+), 14 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
index d82c851347e..93ccbd854d7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
@@ -35,6 +35,7 @@ public class MetaRWQueueRpcExecutor extends 
RWQueueRpcExecutor {
     "hbase.ipc.server.metacallqueue.scan.ratio";
   public static final String META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
     "hbase.ipc.server.metacallqueue.handler.factor";
+  public static final float DEFAULT_META_CALL_QUEUE_HANDLER_FACTOR = 0.5f;
   public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.8f;
   private static final float DEFAULT_META_CALL_QUEUE_SCAN_SHARE = 0.2f;
 
@@ -66,6 +67,7 @@ public class MetaRWQueueRpcExecutor extends 
RWQueueRpcExecutor {
 
   @Override
   protected float getCallQueueHandlerFactor(Configuration conf) {
-    return conf.getFloat(META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5f);
+    return conf.getFloat(META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY,
+      DEFAULT_META_CALL_QUEUE_HANDLER_FACTOR);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 71e46fb5725..cb8d4164321 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -97,9 +97,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     numScanQueues = scanQueues;
     scanHandlersCount = scanHandlers;
 
-    initializeQueues(numWriteQueues);
-    initializeQueues(numReadQueues);
-    initializeQueues(numScanQueues);
+    initializeQueues(numWriteQueues + numReadQueues + numScanQueues);
 
     this.writeBalancer = getBalancer(name, conf, queues.subList(0, 
numWriteQueues));
     this.readBalancer =
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 15c9afe030c..eb1eda1f9f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -57,6 +57,8 @@ public abstract class RpcExecutor {
   private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class);
 
   protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
+  protected static final float DEFAULT_CALL_QUEUE_HANDLER_FACTOR = 0.1f;
+  protected static final int UNDEFINED_MAX_CALLQUEUE_LENGTH = -1;
   public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
     "hbase.ipc.server.callqueue.handler.factor";
 
@@ -124,7 +126,7 @@ public abstract class RpcExecutor {
   }
 
   public RpcExecutor(final String name, final int handlerCount, final String 
callQueueType,
-    final int maxQueueLength, final PriorityFunction priority, final 
Configuration conf,
+    int maxQueueLength, final PriorityFunction priority, final Configuration 
conf,
     final Abortable abortable) {
     this.name = Strings.nullToEmpty(name);
     this.conf = conf;
@@ -153,6 +155,13 @@ public abstract class RpcExecutor {
     this.handlerCount = Math.max(handlerCount, this.numCallQueues);
     this.handlers = new ArrayList<>(this.handlerCount);
 
+    // If soft limit of queue is not provided, then calculate using
+    // DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER
+    if (maxQueueLength == UNDEFINED_MAX_CALLQUEUE_LENGTH) {
+      int handlerCountPerQueue = this.handlerCount / this.numCallQueues;
+      maxQueueLength = handlerCountPerQueue * 
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER;
+    }
+
     if (isDeadlineQueueType(callQueueType)) {
       this.name += ".Deadline";
       this.queueInitArgs =
@@ -222,7 +231,16 @@ public abstract class RpcExecutor {
       .collect(Collectors.groupingBy(Pair::getFirst, 
Collectors.summingLong(Pair::getSecond)));
   }
 
+  // This method can only be called ONCE per executor instance.
+  // Before calling: queueInitArgs[0] contains the soft limit (desired queue 
capacity)
+  // After calling: queueInitArgs[0] is set to hard limit and 
currentQueueLimit stores the original
+  // soft limit.
+  // Multiple calls would incorrectly use the hard limit as the soft limit.
+  // As all the queues has same initArgs and queueClass, there should be no 
need to call this again.
   protected void initializeQueues(final int numQueues) {
+    if (!queues.isEmpty()) {
+      throw new RuntimeException("Queues are already initialized");
+    }
     if (queueInitArgs.length > 0) {
       currentQueueLimit = (int) queueInitArgs[0];
       queueInitArgs[0] = Math.max((int) queueInitArgs[0], 
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
@@ -296,7 +314,7 @@ public abstract class RpcExecutor {
    */
   private static final QueueBalancer ONE_QUEUE = val -> 0;
 
-  public static QueueBalancer getBalancer(final String executorName, final 
Configuration conf,
+  protected static QueueBalancer getBalancer(final String executorName, final 
Configuration conf,
     final List<BlockingQueue<CallRunner>> queues) {
     Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must 
be at least 1");
     if (queues.size() == 1) {
@@ -470,6 +488,6 @@ public abstract class RpcExecutor {
   }
 
   protected float getCallQueueHandlerFactor(Configuration conf) {
-    return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
+    return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 
DEFAULT_CALL_QUEUE_HANDLER_FACTOR);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 6fcee09ac05..9d9a29d1217 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -68,14 +68,14 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
     int bulkLoadHandlerCount = 
conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT,
       HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT);
     int maxQueueLength = 
conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
-      handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+      RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH);
     int maxPriorityQueueLength = 
conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
-      priorityHandlerCount * 
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+      RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH);
     int maxReplicationQueueLength =
       conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
-        replicationHandlerCount * 
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+        RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH);
     int maxBulkLoadQueueLength = 
conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH,
-      bulkLoadHandlerCount * 
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+      RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH);
 
     this.priority = priority;
     this.highPriorityLevel = highPriorityLevel;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java
index 7a7f0e30f5f..0008ea5f44d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java
@@ -20,9 +20,10 @@ package org.apache.hadoop.hbase.ipc;
 import static 
org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY;
 import static 
org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY;
 import static 
org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY;
+import static 
org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
 
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -61,8 +62,9 @@ public class TestRWQueueRpcExecutor {
   @Test
   public void itProvidesCorrectQueuesToBalancers() throws InterruptedException 
{
     PriorityFunction qosFunction = mock(PriorityFunction.class);
-    RWQueueRpcExecutor executor =
-      new RWQueueRpcExecutor(testName.getMethodName(), 100, 100, qosFunction, 
conf, null);
+    int softQueueLimit = 100;
+    RWQueueRpcExecutor executor = new 
RWQueueRpcExecutor(testName.getMethodName(), 100,
+      softQueueLimit, qosFunction, conf, null);
 
     QueueBalancer readBalancer = executor.getReadBalancer();
     QueueBalancer writeBalancer = executor.getWriteBalancer();
@@ -79,6 +81,11 @@ public class TestRWQueueRpcExecutor {
     assertEquals(25, readQueues.size());
     assertEquals(50, writeQueues.size());
     assertEquals(25, scanQueues.size());
+    assertEquals("Soft limit is not applied properly", softQueueLimit, 
executor.currentQueueLimit);
+    // Hard Limit is applied as the max capacity of the queue
+    int hardQueueLimit = readQueues.get(0).remainingCapacity() + 
readQueues.get(0).size();
+    assertEquals("Default hard limit should be applied ", 
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT,
+      hardQueueLimit);
 
     verifyDistinct(readQueues, writeQueues, scanQueues);
     verifyDistinct(writeQueues, readQueues, scanQueues);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java
new file mode 100644
index 00000000000..533aa7bd663
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static 
org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY;
+import static 
org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_HANDLER_FACTOR;
+import static 
org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT;
+import static 
org.apache.hadoop.hbase.ipc.RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+@Tag(RPCTests.TAG)
+@Tag(MediumTests.TAG)
+public class TestRpcExecutor {
+
+  private static Configuration conf;
+
+  @BeforeAll
+  public static void setUp() {
+    conf = HBaseConfiguration.create();
+  }
+
+  /**
+   * Test that validates default soft and hard limits when maxQueueLength is 
not explicitly
+   * configured (-1).
+   */
+  @Test
+  public void testDefaultQueueLimits(TestInfo testInfo) {
+    PriorityFunction qosFunction = mock(PriorityFunction.class);
+    int handlerCount = 100;
+    // Pass -1 to use default maxQueueLength calculation
+    int defaultMaxQueueLength = -1;
+
+    BalancedQueueRpcExecutor executor =
+      new BalancedQueueRpcExecutor(testInfo.getTestMethod().get().getName(), 
handlerCount,
+        defaultMaxQueueLength, qosFunction, conf, null);
+
+    List<BlockingQueue<CallRunner>> queues = executor.getQueues();
+    int expectedQueueSize = Math.round(handlerCount * 
DEFAULT_CALL_QUEUE_HANDLER_FACTOR);
+    Assertions.assertEquals(expectedQueueSize, queues.size(),
+      "Number of queues should be according to default 
callQueueHandlerFactor");
+
+    // By default, the soft limit depends on number of handler the queue will 
serve
+    int expectedSoftLimit =
+      (handlerCount / expectedQueueSize) * 
DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER;
+    Assertions.assertEquals(expectedSoftLimit, executor.currentQueueLimit,
+      "Soft limit of queues is wrongly calculated");
+
+    // Hard limit should be maximum of softLimit and 
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT
+    int hardQueueLimit = queues.get(0).remainingCapacity() + 
queues.get(0).size();
+    int expectedHardLimit = Math.max(expectedSoftLimit, 
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
+    Assertions.assertEquals(expectedHardLimit, hardQueueLimit,
+      "Default hard limit of queues is wrongly calculated ");
+  }
+
+  /**
+   * Test that validates configured soft and hard limits when maxQueueLength 
is explicitly set.
+   */
+  @Test
+  public void testConfiguredQueueLimits(TestInfo testInfo) {
+    float callQueueHandlerFactor = 0.2f;
+    conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, callQueueHandlerFactor);
+    PriorityFunction qosFunction = mock(PriorityFunction.class);
+    int handlerCount = 100;
+    int maxQueueLength = 150;
+
+    BalancedQueueRpcExecutor executor =
+      new BalancedQueueRpcExecutor(testInfo.getTestMethod().get().getName() + 
"1", handlerCount,
+        maxQueueLength, qosFunction, conf, null);
+
+    Assertions.assertEquals(maxQueueLength, executor.currentQueueLimit,
+      "Configured soft limit is not applied.");
+
+    List<BlockingQueue<CallRunner>> queues1 = executor.getQueues();
+
+    int expectedQueueSize = Math.round(handlerCount * callQueueHandlerFactor);
+    Assertions.assertEquals(expectedQueueSize, queues1.size(),
+      "Number of queues should be according to callQueueHandlerFactor");
+
+    int hardQueueLimit1 = queues1.get(0).remainingCapacity() + 
queues1.get(0).size();
+    Assertions.assertEquals(DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT, 
hardQueueLimit1,
+      "Default Hard limit is not applied");
+
+  }
+}

Reply via email to