This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 6b04a7a9e60 HBASE-29141 Calculate default maxQueueLength call queues
correctly (#7490)
6b04a7a9e60 is described below
commit 6b04a7a9e60208c5cce5074fa1ba943b98f8f263
Author: Umesh <[email protected]>
AuthorDate: Tue Jan 27 03:37:54 2026 +0530
HBASE-29141 Calculate default maxQueueLength call queues correctly (#7490)
Co-authored-by: ukumawat <[email protected]>
Signed-off-by: Andrew Purtell <[email protected]>
---
.../hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java | 4 +-
.../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 4 +-
.../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 24 ++++-
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 8 +-
.../hadoop/hbase/ipc/TestRWQueueRpcExecutor.java | 13 ++-
.../apache/hadoop/hbase/ipc/TestRpcExecutor.java | 111 +++++++++++++++++++++
6 files changed, 150 insertions(+), 14 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
index d82c851347e..93ccbd854d7 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
@@ -35,6 +35,7 @@ public class MetaRWQueueRpcExecutor extends
RWQueueRpcExecutor {
"hbase.ipc.server.metacallqueue.scan.ratio";
public static final String META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"hbase.ipc.server.metacallqueue.handler.factor";
+ public static final float DEFAULT_META_CALL_QUEUE_HANDLER_FACTOR = 0.5f;
public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.8f;
private static final float DEFAULT_META_CALL_QUEUE_SCAN_SHARE = 0.2f;
@@ -66,6 +67,7 @@ public class MetaRWQueueRpcExecutor extends
RWQueueRpcExecutor {
@Override
protected float getCallQueueHandlerFactor(Configuration conf) {
- return conf.getFloat(META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5f);
+ return conf.getFloat(META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY,
+ DEFAULT_META_CALL_QUEUE_HANDLER_FACTOR);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 71e46fb5725..cb8d4164321 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -97,9 +97,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
numScanQueues = scanQueues;
scanHandlersCount = scanHandlers;
- initializeQueues(numWriteQueues);
- initializeQueues(numReadQueues);
- initializeQueues(numScanQueues);
+ initializeQueues(numWriteQueues + numReadQueues + numScanQueues);
this.writeBalancer = getBalancer(name, conf, queues.subList(0,
numWriteQueues));
this.readBalancer =
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 15c9afe030c..eb1eda1f9f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -57,6 +57,8 @@ public abstract class RpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class);
protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
+ protected static final float DEFAULT_CALL_QUEUE_HANDLER_FACTOR = 0.1f;
+ protected static final int UNDEFINED_MAX_CALLQUEUE_LENGTH = -1;
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"hbase.ipc.server.callqueue.handler.factor";
@@ -124,7 +126,7 @@ public abstract class RpcExecutor {
}
public RpcExecutor(final String name, final int handlerCount, final String
callQueueType,
- final int maxQueueLength, final PriorityFunction priority, final
Configuration conf,
+ int maxQueueLength, final PriorityFunction priority, final Configuration
conf,
final Abortable abortable) {
this.name = Strings.nullToEmpty(name);
this.conf = conf;
@@ -153,6 +155,13 @@ public abstract class RpcExecutor {
this.handlerCount = Math.max(handlerCount, this.numCallQueues);
this.handlers = new ArrayList<>(this.handlerCount);
+ // If soft limit of queue is not provided, then calculate using
+ // DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER
+ if (maxQueueLength == UNDEFINED_MAX_CALLQUEUE_LENGTH) {
+ int handlerCountPerQueue = this.handlerCount / this.numCallQueues;
+ maxQueueLength = handlerCountPerQueue *
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER;
+ }
+
if (isDeadlineQueueType(callQueueType)) {
this.name += ".Deadline";
this.queueInitArgs =
@@ -222,7 +231,16 @@ public abstract class RpcExecutor {
.collect(Collectors.groupingBy(Pair::getFirst,
Collectors.summingLong(Pair::getSecond)));
}
+ // This method can only be called ONCE per executor instance.
+ // Before calling: queueInitArgs[0] contains the soft limit (desired queue
capacity)
+ // After calling: queueInitArgs[0] is set to hard limit and
currentQueueLimit stores the original
+ // soft limit.
+ // Multiple calls would incorrectly use the hard limit as the soft limit.
+ // As all the queues has same initArgs and queueClass, there should be no
need to call this again.
protected void initializeQueues(final int numQueues) {
+ if (!queues.isEmpty()) {
+ throw new RuntimeException("Queues are already initialized");
+ }
if (queueInitArgs.length > 0) {
currentQueueLimit = (int) queueInitArgs[0];
queueInitArgs[0] = Math.max((int) queueInitArgs[0],
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
@@ -296,7 +314,7 @@ public abstract class RpcExecutor {
*/
private static final QueueBalancer ONE_QUEUE = val -> 0;
- public static QueueBalancer getBalancer(final String executorName, final
Configuration conf,
+ protected static QueueBalancer getBalancer(final String executorName, final
Configuration conf,
final List<BlockingQueue<CallRunner>> queues) {
Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must
be at least 1");
if (queues.size() == 1) {
@@ -470,6 +488,6 @@ public abstract class RpcExecutor {
}
protected float getCallQueueHandlerFactor(Configuration conf) {
- return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
+ return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY,
DEFAULT_CALL_QUEUE_HANDLER_FACTOR);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 6fcee09ac05..9d9a29d1217 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -68,14 +68,14 @@ public class SimpleRpcScheduler extends RpcScheduler
implements ConfigurationObs
int bulkLoadHandlerCount =
conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT);
int maxQueueLength =
conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
- handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+ RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH);
int maxPriorityQueueLength =
conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
- priorityHandlerCount *
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+ RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH);
int maxReplicationQueueLength =
conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
- replicationHandlerCount *
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+ RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH);
int maxBulkLoadQueueLength =
conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH,
- bulkLoadHandlerCount *
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+ RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH);
this.priority = priority;
this.highPriorityLevel = highPriorityLevel;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java
index 7a7f0e30f5f..0008ea5f44d 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java
@@ -20,9 +20,10 @@ package org.apache.hadoop.hbase.ipc;
import static
org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY;
import static
org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY;
import static
org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY;
+import static
org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -61,8 +62,9 @@ public class TestRWQueueRpcExecutor {
@Test
public void itProvidesCorrectQueuesToBalancers() throws InterruptedException
{
PriorityFunction qosFunction = mock(PriorityFunction.class);
- RWQueueRpcExecutor executor =
- new RWQueueRpcExecutor(testName.getMethodName(), 100, 100, qosFunction,
conf, null);
+ int softQueueLimit = 100;
+ RWQueueRpcExecutor executor = new
RWQueueRpcExecutor(testName.getMethodName(), 100,
+ softQueueLimit, qosFunction, conf, null);
QueueBalancer readBalancer = executor.getReadBalancer();
QueueBalancer writeBalancer = executor.getWriteBalancer();
@@ -79,6 +81,11 @@ public class TestRWQueueRpcExecutor {
assertEquals(25, readQueues.size());
assertEquals(50, writeQueues.size());
assertEquals(25, scanQueues.size());
+ assertEquals("Soft limit is not applied properly", softQueueLimit,
executor.currentQueueLimit);
+ // Hard Limit is applied as the max capacity of the queue
+ int hardQueueLimit = readQueues.get(0).remainingCapacity() +
readQueues.get(0).size();
+ assertEquals("Default hard limit should be applied ",
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT,
+ hardQueueLimit);
verifyDistinct(readQueues, writeQueues, scanQueues);
verifyDistinct(writeQueues, readQueues, scanQueues);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java
new file mode 100644
index 00000000000..533aa7bd663
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static
org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY;
+import static
org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_HANDLER_FACTOR;
+import static
org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT;
+import static
org.apache.hadoop.hbase.ipc.RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+@Tag(RPCTests.TAG)
+@Tag(MediumTests.TAG)
+public class TestRpcExecutor {
+
+ private static Configuration conf;
+
+ @BeforeAll
+ public static void setUp() {
+ conf = HBaseConfiguration.create();
+ }
+
+ /**
+ * Test that validates default soft and hard limits when maxQueueLength is
not explicitly
+ * configured (-1).
+ */
+ @Test
+ public void testDefaultQueueLimits(TestInfo testInfo) {
+ PriorityFunction qosFunction = mock(PriorityFunction.class);
+ int handlerCount = 100;
+ // Pass -1 to use default maxQueueLength calculation
+ int defaultMaxQueueLength = -1;
+
+ BalancedQueueRpcExecutor executor =
+ new BalancedQueueRpcExecutor(testInfo.getTestMethod().get().getName(),
handlerCount,
+ defaultMaxQueueLength, qosFunction, conf, null);
+
+ List<BlockingQueue<CallRunner>> queues = executor.getQueues();
+ int expectedQueueSize = Math.round(handlerCount *
DEFAULT_CALL_QUEUE_HANDLER_FACTOR);
+ Assertions.assertEquals(expectedQueueSize, queues.size(),
+ "Number of queues should be according to default
callQueueHandlerFactor");
+
+ // By default, the soft limit depends on number of handler the queue will
serve
+ int expectedSoftLimit =
+ (handlerCount / expectedQueueSize) *
DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER;
+ Assertions.assertEquals(expectedSoftLimit, executor.currentQueueLimit,
+ "Soft limit of queues is wrongly calculated");
+
+ // Hard limit should be maximum of softLimit and
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT
+ int hardQueueLimit = queues.get(0).remainingCapacity() +
queues.get(0).size();
+ int expectedHardLimit = Math.max(expectedSoftLimit,
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
+ Assertions.assertEquals(expectedHardLimit, hardQueueLimit,
+ "Default hard limit of queues is wrongly calculated ");
+ }
+
+ /**
+ * Test that validates configured soft and hard limits when maxQueueLength
is explicitly set.
+ */
+ @Test
+ public void testConfiguredQueueLimits(TestInfo testInfo) {
+ float callQueueHandlerFactor = 0.2f;
+ conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, callQueueHandlerFactor);
+ PriorityFunction qosFunction = mock(PriorityFunction.class);
+ int handlerCount = 100;
+ int maxQueueLength = 150;
+
+ BalancedQueueRpcExecutor executor =
+ new BalancedQueueRpcExecutor(testInfo.getTestMethod().get().getName() +
"1", handlerCount,
+ maxQueueLength, qosFunction, conf, null);
+
+ Assertions.assertEquals(maxQueueLength, executor.currentQueueLimit,
+ "Configured soft limit is not applied.");
+
+ List<BlockingQueue<CallRunner>> queues1 = executor.getQueues();
+
+ int expectedQueueSize = Math.round(handlerCount * callQueueHandlerFactor);
+ Assertions.assertEquals(expectedQueueSize, queues1.size(),
+ "Number of queues should be according to callQueueHandlerFactor");
+
+ int hardQueueLimit1 = queues1.get(0).remainingCapacity() +
queues1.get(0).size();
+ Assertions.assertEquals(DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT,
hardQueueLimit1,
+ "Default Hard limit is not applied");
+
+ }
+}