This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 325f90bb40f HBASE-27144 Add special rpc handlers for bulkload
operations (#4558)
325f90bb40f is described below
commit 325f90bb40f7c3e2e9539e9ec92555cd0ea53ed6
Author: SiCheng-Zheng <[email protected]>
AuthorDate: Sun Jul 17 21:21:58 2022 +0800
HBASE-27144 Add special rpc handlers for bulkload operations (#4558)
Co-authored-by: SiCheng-Zheng <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
(cherry picked from commit ff8eb59709225505be41c73678e907759716a700)
Conflicts:
hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
---
.../java/org/apache/hadoop/hbase/HConstants.java | 9 +++++
.../hadoop/hbase/ipc/MetricsHBaseServerSource.java | 4 ++
.../hbase/ipc/MetricsHBaseServerWrapper.java | 4 ++
.../hbase/ipc/MetricsHBaseServerSourceImpl.java | 4 ++
.../apache/hadoop/hbase/ipc/FifoRpcScheduler.java | 10 +++++
.../hbase/ipc/MetricsHBaseServerWrapperImpl.java | 16 ++++++++
.../org/apache/hadoop/hbase/ipc/RpcScheduler.java | 10 +++++
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 46 ++++++++++++++++++++--
.../AnnotationReadingPriorityFunction.java | 4 ++
.../hadoop/hbase/ipc/DelegatingRpcScheduler.java | 10 +++++
.../hbase/ipc/MetricsHBaseServerWrapperStub.java | 10 +++++
.../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 15 ++++++-
12 files changed, 136 insertions(+), 6 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index efe30bb0b35..af347d9e3d0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1147,6 +1147,9 @@ public final class HConstants {
public static final String REGION_SERVER_REPLICATION_HANDLER_COUNT =
"hbase.regionserver.replication.handler.count";
public static final int DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT = 3;
+ public static final String REGION_SERVER_BULKLOAD_HANDLER_COUNT =
+ "hbase.regionserver.bulkload.handler.count";
+ public static final int DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT = 0;
// Meta Transition handlers to deal with meta
ReportRegionStateTransitionRequest. Meta transition
// should be dealt with in a separate handler in case blocking other
region's transition.
public static final String MASTER_META_TRANSITION_HANDLER_COUNT =
@@ -1233,6 +1236,12 @@ public final class HConstants {
public static final int PRIORITY_UNSET = -1;
public static final int NORMAL_QOS = 0;
public static final int REPLICATION_QOS = 5;
+ public static final int BULKLOAD_QOS = 4;
+ /**
+ * @deprecated since 3.0.0, will be removed in 4.0.0. DLR has been purged
for a long time and
+ * region replication has its own 'replay' method.
+ */
+ @Deprecated
public static final int REPLAY_QOS = 6;
public static final int QOS_THRESHOLD = 10;
public static final int ADMIN_QOS = 100;
diff --git
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index a1ec313f97a..98ecf8b8d92 100644
---
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -56,6 +56,8 @@ public interface MetricsHBaseServerSource extends
ExceptionTrackingSource {
String METAPRIORITY_QUEUE_NAME = "numCallsInMetaPriorityQueue";
String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
String REPLICATION_QUEUE_DESC = "Number of calls in the replication call
queue waiting to be run";
+ String BULKLOAD_QUEUE_NAME = "numCallsInBulkLoadQueue";
+ String BULKLOAD_QUEUE_DESC = "Number of calls in the bulkload call queue
waiting to be run";
String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue
waiting to be run";
String METAPRIORITY_QUEUE_DESC = "Number of calls in the priority call queue
waiting to be run";
String WRITE_QUEUE_NAME = "numCallsInWriteQueue";
@@ -77,6 +79,8 @@ public interface MetricsHBaseServerSource extends
ExceptionTrackingSource {
String NUM_ACTIVE_PRIORITY_HANDLER_DESC = "Number of active priority rpc
handlers.";
String NUM_ACTIVE_REPLICATION_HANDLER_NAME = "numActiveReplicationHandler";
String NUM_ACTIVE_REPLICATION_HANDLER_DESC = "Number of active replication
rpc handlers.";
+ String NUM_ACTIVE_BULKLOAD_HANDLER_NAME = "numActiveBulkLoadHandler";
+ String NUM_ACTIVE_BULKLOAD_HANDLER_DESC = "Number of active bulkload rpc
handlers.";
String NUM_ACTIVE_WRITE_HANDLER_NAME = "numActiveWriteHandler";
String NUM_ACTIVE_WRITE_HANDLER_DESC = "Number of active write rpc
handlers.";
String NUM_ACTIVE_READ_HANDLER_NAME = "numActiveReadHandler";
diff --git
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
index 136294883b6..1a8980bbc7b 100644
---
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
+++
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
@@ -27,6 +27,8 @@ public interface MetricsHBaseServerWrapper {
int getReplicationQueueLength();
+ int getBulkLoadQueueLength();
+
int getPriorityQueueLength();
int getMetaPriorityQueueLength();
@@ -41,6 +43,8 @@ public interface MetricsHBaseServerWrapper {
int getActiveReplicationRpcHandlerCount();
+ int getActiveBulkLoadRpcHandlerCount();
+
int getActiveMetaPriorityRpcHandlerCount();
long getNumGeneralCallsDropped();
diff --git
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index 440ebc6f5a6..9c75f4e6bcb 100644
---
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -143,6 +143,8 @@ public class MetricsHBaseServerSourceImpl extends
ExceptionTrackingSourceImpl
wrapper.getGeneralQueueLength())
.addGauge(Interns.info(REPLICATION_QUEUE_NAME, REPLICATION_QUEUE_DESC),
wrapper.getReplicationQueueLength())
+ .addGauge(Interns.info(BULKLOAD_QUEUE_NAME, BULKLOAD_QUEUE_DESC),
+ wrapper.getBulkLoadQueueLength())
.addGauge(Interns.info(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_DESC),
wrapper.getPriorityQueueLength())
.addGauge(Interns.info(METAPRIORITY_QUEUE_NAME,
METAPRIORITY_QUEUE_DESC),
@@ -163,6 +165,8 @@ public class MetricsHBaseServerSourceImpl extends
ExceptionTrackingSourceImpl
.addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME,
NUM_LIFO_MODE_SWITCHES_DESC),
wrapper.getNumLifoModeSwitches())
.addGauge(Interns.info(WRITE_QUEUE_NAME, WRITE_QUEUE_DESC),
wrapper.getWriteQueueLength())
+ .addGauge(Interns.info(NUM_ACTIVE_BULKLOAD_HANDLER_NAME,
NUM_ACTIVE_BULKLOAD_HANDLER_DESC),
+ wrapper.getActiveBulkLoadRpcHandlerCount())
.addGauge(Interns.info(READ_QUEUE_NAME, READ_QUEUE_DESC),
wrapper.getReadQueueLength())
.addGauge(Interns.info(SCAN_QUEUE_NAME, SCAN_QUEUE_DESC),
wrapper.getScanQueueLength())
.addGauge(Interns.info(NUM_ACTIVE_WRITE_HANDLER_NAME,
NUM_ACTIVE_WRITE_HANDLER_DESC),
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index b1b2193d5b9..b51154fc24e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -130,6 +130,11 @@ public class FifoRpcScheduler extends RpcScheduler {
return 0;
}
+ @Override
+ public int getBulkLoadQueueLength() {
+ return 0;
+ }
+
@Override
public int getActiveRpcHandlerCount() {
return executor.getActiveCount();
@@ -150,6 +155,11 @@ public class FifoRpcScheduler extends RpcScheduler {
return 0;
}
+ @Override
+ public int getActiveBulkLoadRpcHandlerCount() {
+ return 0;
+ }
+
@Override
public int getActiveMetaPriorityRpcHandlerCount() {
return 0;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
index 0b00bba04fb..857315568c5 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
@@ -57,6 +57,14 @@ public class MetricsHBaseServerWrapperImpl implements
MetricsHBaseServerWrapper
return server.getScheduler().getReplicationQueueLength();
}
+ @Override
+ public int getBulkLoadQueueLength() {
+ if (!isServerStarted() || this.server.getScheduler() == null) {
+ return 0;
+ }
+ return server.getScheduler().getBulkLoadQueueLength();
+ }
+
@Override
public int getPriorityQueueLength() {
if (!isServerStarted() || this.server.getScheduler() == null) {
@@ -121,6 +129,14 @@ public class MetricsHBaseServerWrapperImpl implements
MetricsHBaseServerWrapper
return server.getScheduler().getActiveReplicationRpcHandlerCount();
}
+ @Override
+ public int getActiveBulkLoadRpcHandlerCount() {
+ if (!isServerStarted() || this.server.getScheduler() == null) {
+ return 0;
+ }
+ return server.getScheduler().getActiveBulkLoadRpcHandlerCount();
+ }
+
@Override
public long getNumGeneralCallsDropped() {
if (!isServerStarted() || this.server.getScheduler() == null) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
index d81b3224901..f73590a96c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
@@ -33,6 +33,10 @@ public abstract class RpcScheduler {
"hbase.ipc.server.max.callqueue.length";
public static final String IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH =
"hbase.ipc.server.priority.max.callqueue.length";
+ public static final String IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH =
+ "hbase.ipc.server.replication.max.callqueue.length";
+ public static final String IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH =
+ "hbase.ipc.server.bulkload.max.callqueue.length";
/** Exposes runtime information of a {@code RpcServer} that a {@code
RpcScheduler} may need. */
public static abstract class Context {
@@ -76,6 +80,9 @@ public abstract class RpcScheduler {
/** Retrieves length of the replication queue for metrics. */
public abstract int getReplicationQueueLength();
+ /** Retrieves length of the bulkload queue for metrics. */
+ public abstract int getBulkLoadQueueLength();
+
/** Retrieves the total number of active handler. */
public abstract int getActiveRpcHandlerCount();
@@ -91,6 +98,9 @@ public abstract class RpcScheduler {
/** Retrieves the number of active replication handler. */
public abstract int getActiveReplicationRpcHandlerCount();
+ /** Retrieves the number of active bulkload handler. */
+ public abstract int getActiveBulkLoadRpcHandlerCount();
+
/**
* If CoDel-based RPC executors are used, retrieves the number of Calls that
were dropped from
* general queue because RPC executor is under high load; returns 0
otherwise.
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index bb8025ca911..f592633e933 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -49,6 +49,8 @@ public class SimpleRpcScheduler extends RpcScheduler
implements ConfigurationObs
*/
private final RpcExecutor metaTransitionExecutor;
+ private final RpcExecutor bulkloadExecutor;
+
/** What level a high priority call is at. */
private final int highPriorityLevel;
@@ -63,11 +65,17 @@ public class SimpleRpcScheduler extends RpcScheduler
implements ConfigurationObs
public SimpleRpcScheduler(Configuration conf, int handlerCount, int
priorityHandlerCount,
int replicationHandlerCount, int metaTransitionHandler, PriorityFunction
priority,
Abortable server, int highPriorityLevel) {
-
+ int bulkLoadHandlerCount =
conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT);
int maxQueueLength =
conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
- int maxPriorityQueueLength =
- conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
maxQueueLength);
+ int maxPriorityQueueLength =
conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
+ priorityHandlerCount *
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+ int maxReplicationQueueLength =
+ conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
+ replicationHandlerCount *
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+ int maxBulkLoadQueueLength =
conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH,
+ bulkLoadHandlerCount *
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.priority = priority;
this.highPriorityLevel = highPriorityLevel;
@@ -119,6 +127,11 @@ public class SimpleRpcScheduler extends RpcScheduler
implements ConfigurationObs
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength,
priority, conf,
abortable)
: null;
+ this.bulkloadExecutor = bulkLoadHandlerCount > 0
+ ? new FastPathBalancedQueueRpcExecutor("bulkLoad.FPBQ",
bulkLoadHandlerCount,
+ RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxBulkLoadQueueLength,
priority, conf,
+ abortable)
+ : null;
}
public SimpleRpcScheduler(Configuration conf, int handlerCount, int
priorityHandlerCount,
@@ -170,6 +183,9 @@ public class SimpleRpcScheduler extends RpcScheduler
implements ConfigurationObs
if (metaTransitionExecutor != null) {
metaTransitionExecutor.start(port);
}
+ if (bulkloadExecutor != null) {
+ bulkloadExecutor.start(port);
+ }
}
@@ -185,6 +201,9 @@ public class SimpleRpcScheduler extends RpcScheduler
implements ConfigurationObs
if (metaTransitionExecutor != null) {
metaTransitionExecutor.stop();
}
+ if (bulkloadExecutor != null) {
+ bulkloadExecutor.stop();
+ }
}
@@ -205,6 +224,8 @@ public class SimpleRpcScheduler extends RpcScheduler
implements ConfigurationObs
return priorityExecutor.dispatch(callTask);
} else if (replicationExecutor != null && level ==
HConstants.REPLICATION_QOS) {
return replicationExecutor.dispatch(callTask);
+ } else if (bulkloadExecutor != null && level == HConstants.BULKLOAD_QOS) {
+ return bulkloadExecutor.dispatch(callTask);
} else {
return callExecutor.dispatch(callTask);
}
@@ -230,10 +251,16 @@ public class SimpleRpcScheduler extends RpcScheduler
implements ConfigurationObs
return replicationExecutor == null ? 0 :
replicationExecutor.getQueueLength();
}
+ @Override
+ public int getBulkLoadQueueLength() {
+ return bulkloadExecutor == null ? 0 : bulkloadExecutor.getQueueLength();
+ }
+
@Override
public int getActiveRpcHandlerCount() {
return callExecutor.getActiveHandlerCount() +
getActivePriorityRpcHandlerCount()
- + getActiveReplicationRpcHandlerCount() +
getActiveMetaPriorityRpcHandlerCount();
+ + getActiveReplicationRpcHandlerCount() +
getActiveMetaPriorityRpcHandlerCount()
+ + getActiveBulkLoadRpcHandlerCount();
}
@Override
@@ -256,6 +283,11 @@ public class SimpleRpcScheduler extends RpcScheduler
implements ConfigurationObs
return (replicationExecutor == null ? 0 :
replicationExecutor.getActiveHandlerCount());
}
+ @Override
+ public int getActiveBulkLoadRpcHandlerCount() {
+ return bulkloadExecutor == null ? 0 :
bulkloadExecutor.getActiveHandlerCount();
+ }
+
@Override
public long getNumGeneralCallsDropped() {
return callExecutor.getNumGeneralCallsDropped();
@@ -327,6 +359,12 @@ public class SimpleRpcScheduler extends RpcScheduler
implements ConfigurationObs
callQueueInfo.setCallMethodSize(queueName,
metaTransitionExecutor.getCallQueueSizeSummary());
}
+ if (null != bulkloadExecutor) {
+ queueName = "BulkLoad Queue";
+ callQueueInfo.setCallMethodCount(queueName,
bulkloadExecutor.getCallQueueCountsSummary());
+ callQueueInfo.setCallMethodSize(queueName,
bulkloadExecutor.getCallQueueSizeSummary());
+ }
+
return callQueueInfo;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
index cec2f483f8f..5b1fed0880b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
@@ -37,6 +37,7 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegi
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
@@ -183,6 +184,9 @@ public class AnnotationReadingPriorityFunction implements
PriorityFunction {
if (header.hasPriority()) {
return header.getPriority();
}
+ if (param instanceof BulkLoadHFileRequest) {
+ return HConstants.BULKLOAD_QOS;
+ }
String cls = param.getClass().getName();
Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
index 7d2836e7c6c..f8ac4a1bb9a 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
@@ -44,6 +44,11 @@ public class DelegatingRpcScheduler extends RpcScheduler {
return delegate.getReplicationQueueLength();
}
+ @Override
+ public int getBulkLoadQueueLength() {
+ return delegate.getBulkLoadQueueLength();
+ }
+
@Override
public int getPriorityQueueLength() {
return delegate.getPriorityQueueLength();
@@ -74,6 +79,11 @@ public class DelegatingRpcScheduler extends RpcScheduler {
return delegate.getActiveReplicationRpcHandlerCount();
}
+ @Override
+ public int getActiveBulkLoadRpcHandlerCount() {
+ return delegate.getActiveBulkLoadRpcHandlerCount();
+ }
+
@Override
public boolean dispatch(CallRunner task) {
return delegate.dispatch(task);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
index f525c027a40..6e5dfe87fc7 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
@@ -33,6 +33,11 @@ public class MetricsHBaseServerWrapperStub implements
MetricsHBaseServerWrapper
return 103;
}
+ @Override
+ public int getBulkLoadQueueLength() {
+ return 109;
+ }
+
@Override
public int getPriorityQueueLength() {
return 104;
@@ -63,6 +68,11 @@ public class MetricsHBaseServerWrapperStub implements
MetricsHBaseServerWrapper
return 203;
}
+ @Override
+ public int getActiveBulkLoadRpcHandlerCount() {
+ return 204;
+ }
+
@Override
public long getNumGeneralCallsDropped() {
return 3;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 79bc97fb07c..0c629231728 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -480,12 +480,21 @@ public class TestSimpleRpcScheduler {
when(scanCall.getHeader()).thenReturn(scanHead);
when(scanCall.getParam()).thenReturn(scanCall.param);
+ CallRunner bulkLoadCallTask = mock(CallRunner.class);
+ ServerCall bulkLoadCall = mock(ServerCall.class);
+ bulkLoadCall.param = ScanRequest.newBuilder().build();
+ RequestHeader bulkLadHead =
RequestHeader.newBuilder().setMethodName("bulkload").build();
+ when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall);
+ when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead);
+ when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param);
+
ArrayList<Integer> work = new ArrayList<>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
doAnswerTaskExecution(getCallTask, work, 2, 1000);
doAnswerTaskExecution(scanCallTask, work, 3, 1000);
+ doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000);
- // There are 3 queues: [puts], [gets], [scans]
+ // There are 3 queues: [puts], [gets], [scans], [bulkload]
// so the calls will be interleaved
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
@@ -496,7 +505,9 @@ public class TestSimpleRpcScheduler {
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
-
+ scheduler.dispatch(bulkLoadCallTask);
+ scheduler.dispatch(bulkLoadCallTask);
+ scheduler.dispatch(bulkLoadCallTask);
while (work.size() < 6) {
Thread.sleep(100);
}