This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new f4cbb69410b HBASE-29837 Backport HBASE-27355 Separate meta read
requests from master and client (#7261) (#7652)
f4cbb69410b is described below
commit f4cbb69410b6ebe50c69a77119a819a0c9900c49
Author: Umesh <[email protected]>
AuthorDate: Tue Jan 27 03:21:42 2026 +0530
HBASE-29837 Backport HBASE-27355 Separate meta read requests from master
and client (#7261) (#7652)
* HBASE-27355 Separate meta read requests from master and client (#7261)
Co-authored-by: huiruan <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
Reviewed-by: Aman Poonia <[email protected]>
* HBASE-29837 spotless apply
Signed-off-by: Duo Zhang <[email protected]>
Co-authored-by: Ruanhui <[email protected]>
Co-authored-by: ukumawat <[email protected]>
Signed-off-by: Andrew Purtell <[email protected]>
---
.../org/apache/hadoop/hbase/MetaTableAccessor.java | 19 ++++---
.../java/org/apache/hadoop/hbase/HConstants.java | 2 +
.../hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java | 24 ++++++++-
.../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 4 ++
.../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 6 ++-
.../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 59 ++++++++++++----------
6 files changed, 75 insertions(+), 39 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 2ef70daab0a..1819dda048d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -285,6 +285,7 @@ public class MetaTableAccessor {
}
Get get = new Get(row);
get.addFamily(HConstants.CATALOG_FAMILY);
+ get.setPriority(HConstants.INTERNAL_READ_QOS);
Result r = get(getMetaHTable(connection), get);
RegionLocations locations = getRegionLocations(r);
return locations == null
@@ -310,6 +311,7 @@ public class MetaTableAccessor {
throws IOException {
Get get = new Get(getMetaKeyForRegion(ri));
get.addFamily(HConstants.CATALOG_FAMILY);
+ get.setPriority(HConstants.INTERNAL_READ_QOS);
return get(getMetaHTable(connection), get);
}
@@ -339,9 +341,7 @@ public class MetaTableAccessor {
*/
public static Result getRegionResult(Connection connection, RegionInfo
regionInfo)
throws IOException {
- Get get = new Get(getMetaKeyForRegion(regionInfo));
- get.addFamily(HConstants.CATALOG_FAMILY);
- return get(getMetaHTable(connection), get);
+ return getCatalogFamilyRow(connection, regionInfo);
}
/**
@@ -576,6 +576,7 @@ public class MetaTableAccessor {
scan.setReadType(Scan.ReadType.PREAD);
}
scan.setCaching(scannerCaching);
+ scan.setPriority(HConstants.INTERNAL_READ_QOS);
return scan;
}
@@ -776,9 +777,11 @@ public class MetaTableAccessor {
}
if (LOG.isTraceEnabled()) {
- LOG.trace("Scanning META" + " starting at row=" +
Bytes.toStringBinary(startRow)
- + " stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" +
rowUpperLimit
- + " with caching=" + scan.getCaching());
+ LOG.trace(
+ "Scanning META starting at row={} stopping at row={} for max={} with
caching={} "
+ + "priority={}",
+ Bytes.toStringBinary(startRow), Bytes.toStringBinary(stopRow),
rowUpperLimit,
+ scan.getCaching(), scan.getPriority());
}
int currentRow = 0;
@@ -1774,7 +1777,7 @@ public class MetaTableAccessor {
addRegionInfo(put, regionInfo);
addLocation(put, sn, openSeqNum, regionInfo.getReplicaId());
putToMetaTable(connection, put);
- LOG.info("Updated row {} with server=",
regionInfo.getRegionNameAsString(), sn);
+ LOG.info("Updated row {} with server = {}",
regionInfo.getRegionNameAsString(), sn);
}
/**
@@ -1899,7 +1902,7 @@ public class MetaTableAccessor {
.setType(Cell.Type.Put).setValue(Bytes.toBytes(sn.getAddress().toString())).build())
.add(builder.clear().setRow(p.getRow()).setFamily(getCatalogFamily())
.setQualifier(getStartCodeColumn(replicaId)).setTimestamp(p.getTimestamp())
-
.setType(Cell.Type.Put).setValue(Bytes.toBytes(sn.getStartcode())).build())
+
.setType(Cell.Type.Put).setValue(Bytes.toBytes(sn.getStartCode())).build())
.add(builder.clear().setRow(p.getRow()).setFamily(getCatalogFamily())
.setQualifier(getSeqNumColumn(replicaId)).setTimestamp(p.getTimestamp())
.setType(Cell.Type.Put).setValue(Bytes.toBytes(openSeqNum)).build());
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index c677e610c72..fbe563d60d0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1270,6 +1270,8 @@ public final class HConstants {
public static final int ADMIN_QOS = 100;
public static final int HIGH_QOS = 200;
public static final int SYSTEMTABLE_QOS = HIGH_QOS;
+ // QOS for internal meta read requests
+ public static final int INTERNAL_READ_QOS = 250;
/**
* @deprecated the name "META_QOS" is a bit ambiguous, actually only meta
region transition can
* use this priority, and you should not use this directly. Will
be removed in 3.0.0.
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
index a86e6554b1c..d82c851347e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -32,7 +33,10 @@ public class MetaRWQueueRpcExecutor extends
RWQueueRpcExecutor {
"hbase.ipc.server.metacallqueue.read.ratio";
public static final String META_CALL_QUEUE_SCAN_SHARE_CONF_KEY =
"hbase.ipc.server.metacallqueue.scan.ratio";
- public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.9f;
+ public static final String META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
+ "hbase.ipc.server.metacallqueue.handler.factor";
+ public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.8f;
+ private static final float DEFAULT_META_CALL_QUEUE_SCAN_SHARE = 0.2f;
public MetaRWQueueRpcExecutor(final String name, final int handlerCount,
final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable
abortable) {
@@ -46,6 +50,22 @@ public class MetaRWQueueRpcExecutor extends
RWQueueRpcExecutor {
@Override
protected float getScanShare(final Configuration conf) {
- return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
+ return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY,
DEFAULT_META_CALL_QUEUE_SCAN_SHARE);
+ }
+
+ @Override
+ public boolean dispatch(CallRunner callTask) {
+ RpcCall call = callTask.getRpcCall();
+ int level = call.getHeader().getPriority();
+ final boolean toWriteQueue = isWriteRequest(call.getHeader(),
call.getParam());
+ // dispatch client system read request to read handlers
+ // dispatch internal system read request to scan handlers
+ final boolean toScanQueue = getNumScanQueues() > 0 && level ==
HConstants.INTERNAL_READ_QOS;
+ return dispatchTo(toWriteQueue, toScanQueue, callTask);
+ }
+
+ @Override
+ protected float getCallQueueHandlerFactor(Configuration conf) {
+ return conf.getFloat(META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5f);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index bdb10919bf1..71e46fb5725 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -298,4 +298,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
((ConfigurationObserver) balancer).onConfigurationChange(conf);
}
}
+
+ protected int getNumScanQueues() {
+ return numScanQueues;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 7e5bdfcc7d6..15c9afe030c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -130,7 +130,7 @@ public abstract class RpcExecutor {
this.conf = conf;
this.abortable = abortable;
- float callQueuesHandlersFactor =
this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
+ float callQueuesHandlersFactor = getCallQueueHandlerFactor(conf);
if (
Float.compare(callQueuesHandlersFactor, 1.0f) > 0
|| Float.compare(0.0f, callQueuesHandlersFactor) > 0
@@ -468,4 +468,8 @@ public abstract class RpcExecutor {
}
}
}
+
+ protected float getCallQueueHandlerFactor(Configuration conf) {
+ return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 19aa46a0d62..86f46171673 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -108,7 +108,7 @@ public class TestSimpleRpcScheduler {
RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0,
qosFunction, 0);
scheduler.init(CONTEXT);
scheduler.start();
- CallRunner task = createMockTask();
+ CallRunner task = createMockTask(HConstants.NORMAL_QOS);
task.setStatus(new MonitoredRPCHandlerImpl("test"));
scheduler.dispatch(task);
verify(task, timeout(10000)).run();
@@ -163,7 +163,7 @@ public class TestSimpleRpcScheduler {
int totalCallMethods = 10;
for (int i = totalCallMethods; i > 0; i--) {
- CallRunner task = createMockTask();
+ CallRunner task = createMockTask(HConstants.NORMAL_QOS);
task.setStatus(new MonitoredRPCHandlerImpl("test"));
scheduler.dispatch(task);
}
@@ -185,9 +185,9 @@ public class TestSimpleRpcScheduler {
@Test
public void testHandlerIsolation() throws IOException, InterruptedException {
- CallRunner generalTask = createMockTask();
- CallRunner priorityTask = createMockTask();
- CallRunner replicationTask = createMockTask();
+ CallRunner generalTask = createMockTask(HConstants.NORMAL_QOS);
+ CallRunner priorityTask = createMockTask(HConstants.HIGH_QOS + 1);
+ CallRunner replicationTask = createMockTask(HConstants.REPLICATION_QOS);
List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask,
replicationTask);
Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0,
priorityTask,
HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS);
@@ -227,10 +227,12 @@ public class TestSimpleRpcScheduler {
assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
}
- private CallRunner createMockTask() {
+ private CallRunner createMockTask(int priority) {
ServerCall call = mock(ServerCall.class);
CallRunner task = mock(CallRunner.class);
+ RequestHeader header =
RequestHeader.newBuilder().setPriority(priority).build();
when(task.getRpcCall()).thenReturn(call);
+ when(call.getHeader()).thenReturn(header);
return task;
}
@@ -707,7 +709,7 @@ public class TestSimpleRpcScheduler {
@Test
public void testMetaRWScanQueues() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
- schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
+
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY,
1.0f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY,
0.7f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY,
0.5f);
@@ -728,36 +730,37 @@ public class TestSimpleRpcScheduler {
when(putCall.getHeader()).thenReturn(putHead);
when(putCall.getParam()).thenReturn(putCall.param);
- CallRunner getCallTask = mock(CallRunner.class);
- ServerCall getCall = mock(ServerCall.class);
- RequestHeader getHead =
RequestHeader.newBuilder().setMethodName("get").build();
- when(getCallTask.getRpcCall()).thenReturn(getCall);
- when(getCall.getHeader()).thenReturn(getHead);
-
- CallRunner scanCallTask = mock(CallRunner.class);
- ServerCall scanCall = mock(ServerCall.class);
- scanCall.param = ScanRequest.newBuilder().build();
- RequestHeader scanHead =
RequestHeader.newBuilder().setMethodName("scan").build();
- when(scanCallTask.getRpcCall()).thenReturn(scanCall);
- when(scanCall.getHeader()).thenReturn(scanHead);
- when(scanCall.getParam()).thenReturn(scanCall.param);
+ CallRunner clientReadCallTask = mock(CallRunner.class);
+ ServerCall clientReadCall = mock(ServerCall.class);
+ RequestHeader clientReadHead =
RequestHeader.newBuilder().setMethodName("get").build();
+ when(clientReadCallTask.getRpcCall()).thenReturn(clientReadCall);
+ when(clientReadCall.getHeader()).thenReturn(clientReadHead);
+
+ CallRunner internalReadCallTask = mock(CallRunner.class);
+ ServerCall internalReadCall = mock(ServerCall.class);
+ internalReadCall.param = ScanRequest.newBuilder().build();
+ RequestHeader masterReadHead =
RequestHeader.newBuilder().setMethodName("scan")
+ .setPriority(HConstants.INTERNAL_READ_QOS).build();
+ when(internalReadCallTask.getRpcCall()).thenReturn(internalReadCall);
+ when(internalReadCall.getHeader()).thenReturn(masterReadHead);
+ when(internalReadCall.getParam()).thenReturn(internalReadCall.param);
ArrayList<Integer> work = new ArrayList<>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
- doAnswerTaskExecution(getCallTask, work, 2, 1000);
- doAnswerTaskExecution(scanCallTask, work, 3, 1000);
+ doAnswerTaskExecution(clientReadCallTask, work, 2, 1000);
+ doAnswerTaskExecution(internalReadCallTask, work, 3, 1000);
// There are 3 queues: [puts], [gets], [scans]
// so the calls will be interleaved
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
- scheduler.dispatch(getCallTask);
- scheduler.dispatch(getCallTask);
- scheduler.dispatch(getCallTask);
- scheduler.dispatch(scanCallTask);
- scheduler.dispatch(scanCallTask);
- scheduler.dispatch(scanCallTask);
+ scheduler.dispatch(clientReadCallTask);
+ scheduler.dispatch(clientReadCallTask);
+ scheduler.dispatch(clientReadCallTask);
+ scheduler.dispatch(internalReadCallTask);
+ scheduler.dispatch(internalReadCallTask);
+ scheduler.dispatch(internalReadCallTask);
while (work.size() < 6) {
Thread.sleep(100);