This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 325f90bb40f HBASE-27144 Add special rpc handlers for bulkload 
operations (#4558)
325f90bb40f is described below

commit 325f90bb40f7c3e2e9539e9ec92555cd0ea53ed6
Author: SiCheng-Zheng <[email protected]>
AuthorDate: Sun Jul 17 21:21:58 2022 +0800

    HBASE-27144 Add special rpc handlers for bulkload operations (#4558)
    
    Co-authored-by: SiCheng-Zheng <[email protected]>
    Signed-off-by: Duo Zhang <[email protected]>
    (cherry picked from commit ff8eb59709225505be41c73678e907759716a700)
    
    Conflicts:
            hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
            
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
            
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
            
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
---
 .../java/org/apache/hadoop/hbase/HConstants.java   |  9 +++++
 .../hadoop/hbase/ipc/MetricsHBaseServerSource.java |  4 ++
 .../hbase/ipc/MetricsHBaseServerWrapper.java       |  4 ++
 .../hbase/ipc/MetricsHBaseServerSourceImpl.java    |  4 ++
 .../apache/hadoop/hbase/ipc/FifoRpcScheduler.java  | 10 +++++
 .../hbase/ipc/MetricsHBaseServerWrapperImpl.java   | 16 ++++++++
 .../org/apache/hadoop/hbase/ipc/RpcScheduler.java  | 10 +++++
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java       | 46 ++++++++++++++++++++--
 .../AnnotationReadingPriorityFunction.java         |  4 ++
 .../hadoop/hbase/ipc/DelegatingRpcScheduler.java   | 10 +++++
 .../hbase/ipc/MetricsHBaseServerWrapperStub.java   | 10 +++++
 .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java   | 15 ++++++-
 12 files changed, 136 insertions(+), 6 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index efe30bb0b35..af347d9e3d0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1147,6 +1147,9 @@ public final class HConstants {
   public static final String REGION_SERVER_REPLICATION_HANDLER_COUNT =
     "hbase.regionserver.replication.handler.count";
   public static final int DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT = 3;
+  public static final String REGION_SERVER_BULKLOAD_HANDLER_COUNT =
+    "hbase.regionserver.bulkload.handler.count";
+  public static final int DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT = 0;
   // Meta Transition handlers to deal with meta 
ReportRegionStateTransitionRequest. Meta transition
   // should be dealt with in a separate handler in case blocking other 
region's transition.
   public static final String MASTER_META_TRANSITION_HANDLER_COUNT =
@@ -1233,6 +1236,12 @@ public final class HConstants {
   public static final int PRIORITY_UNSET = -1;
   public static final int NORMAL_QOS = 0;
   public static final int REPLICATION_QOS = 5;
+  public static final int BULKLOAD_QOS = 4;
+  /**
+   * @deprecated since 3.0.0, will be removed in 4.0.0. DLR has been purged 
for a long time and
+   *             region replication has its own 'replay' method.
+   */
+  @Deprecated
   public static final int REPLAY_QOS = 6;
   public static final int QOS_THRESHOLD = 10;
   public static final int ADMIN_QOS = 100;
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index a1ec313f97a..98ecf8b8d92 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -56,6 +56,8 @@ public interface MetricsHBaseServerSource extends 
ExceptionTrackingSource {
   String METAPRIORITY_QUEUE_NAME = "numCallsInMetaPriorityQueue";
   String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
   String REPLICATION_QUEUE_DESC = "Number of calls in the replication call 
queue waiting to be run";
+  String BULKLOAD_QUEUE_NAME = "numCallsInBulkLoadQueue";
+  String BULKLOAD_QUEUE_DESC = "Number of calls in the bulkload call queue 
waiting to be run";
   String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue 
waiting to be run";
   String METAPRIORITY_QUEUE_DESC = "Number of calls in the priority call queue 
waiting to be run";
   String WRITE_QUEUE_NAME = "numCallsInWriteQueue";
@@ -77,6 +79,8 @@ public interface MetricsHBaseServerSource extends 
ExceptionTrackingSource {
   String NUM_ACTIVE_PRIORITY_HANDLER_DESC = "Number of active priority rpc 
handlers.";
   String NUM_ACTIVE_REPLICATION_HANDLER_NAME = "numActiveReplicationHandler";
   String NUM_ACTIVE_REPLICATION_HANDLER_DESC = "Number of active replication 
rpc handlers.";
+  String NUM_ACTIVE_BULKLOAD_HANDLER_NAME = "numActiveBulkLoadHandler";
+  String NUM_ACTIVE_BULKLOAD_HANDLER_DESC = "Number of active bulkload rpc 
handlers.";
   String NUM_ACTIVE_WRITE_HANDLER_NAME = "numActiveWriteHandler";
   String NUM_ACTIVE_WRITE_HANDLER_DESC = "Number of active write rpc 
handlers.";
   String NUM_ACTIVE_READ_HANDLER_NAME = "numActiveReadHandler";
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
index 136294883b6..1a8980bbc7b 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
@@ -27,6 +27,8 @@ public interface MetricsHBaseServerWrapper {
 
   int getReplicationQueueLength();
 
+  int getBulkLoadQueueLength();
+
   int getPriorityQueueLength();
 
   int getMetaPriorityQueueLength();
@@ -41,6 +43,8 @@ public interface MetricsHBaseServerWrapper {
 
   int getActiveReplicationRpcHandlerCount();
 
+  int getActiveBulkLoadRpcHandlerCount();
+
   int getActiveMetaPriorityRpcHandlerCount();
 
   long getNumGeneralCallsDropped();
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index 440ebc6f5a6..9c75f4e6bcb 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -143,6 +143,8 @@ public class MetricsHBaseServerSourceImpl extends 
ExceptionTrackingSourceImpl
           wrapper.getGeneralQueueLength())
         .addGauge(Interns.info(REPLICATION_QUEUE_NAME, REPLICATION_QUEUE_DESC),
           wrapper.getReplicationQueueLength())
+        .addGauge(Interns.info(BULKLOAD_QUEUE_NAME, BULKLOAD_QUEUE_DESC),
+          wrapper.getBulkLoadQueueLength())
         .addGauge(Interns.info(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_DESC),
           wrapper.getPriorityQueueLength())
         .addGauge(Interns.info(METAPRIORITY_QUEUE_NAME, 
METAPRIORITY_QUEUE_DESC),
@@ -163,6 +165,8 @@ public class MetricsHBaseServerSourceImpl extends 
ExceptionTrackingSourceImpl
         .addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME, 
NUM_LIFO_MODE_SWITCHES_DESC),
           wrapper.getNumLifoModeSwitches())
         .addGauge(Interns.info(WRITE_QUEUE_NAME, WRITE_QUEUE_DESC), 
wrapper.getWriteQueueLength())
+        .addGauge(Interns.info(NUM_ACTIVE_BULKLOAD_HANDLER_NAME, 
NUM_ACTIVE_BULKLOAD_HANDLER_DESC),
+          wrapper.getActiveBulkLoadRpcHandlerCount())
         .addGauge(Interns.info(READ_QUEUE_NAME, READ_QUEUE_DESC), 
wrapper.getReadQueueLength())
         .addGauge(Interns.info(SCAN_QUEUE_NAME, SCAN_QUEUE_DESC), 
wrapper.getScanQueueLength())
         .addGauge(Interns.info(NUM_ACTIVE_WRITE_HANDLER_NAME, 
NUM_ACTIVE_WRITE_HANDLER_DESC),
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index b1b2193d5b9..b51154fc24e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -130,6 +130,11 @@ public class FifoRpcScheduler extends RpcScheduler {
     return 0;
   }
 
+  @Override
+  public int getBulkLoadQueueLength() {
+    return 0;
+  }
+
   @Override
   public int getActiveRpcHandlerCount() {
     return executor.getActiveCount();
@@ -150,6 +155,11 @@ public class FifoRpcScheduler extends RpcScheduler {
     return 0;
   }
 
+  @Override
+  public int getActiveBulkLoadRpcHandlerCount() {
+    return 0;
+  }
+
   @Override
   public int getActiveMetaPriorityRpcHandlerCount() {
     return 0;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
index 0b00bba04fb..857315568c5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
@@ -57,6 +57,14 @@ public class MetricsHBaseServerWrapperImpl implements 
MetricsHBaseServerWrapper
     return server.getScheduler().getReplicationQueueLength();
   }
 
+  @Override
+  public int getBulkLoadQueueLength() {
+    if (!isServerStarted() || this.server.getScheduler() == null) {
+      return 0;
+    }
+    return server.getScheduler().getBulkLoadQueueLength();
+  }
+
   @Override
   public int getPriorityQueueLength() {
     if (!isServerStarted() || this.server.getScheduler() == null) {
@@ -121,6 +129,14 @@ public class MetricsHBaseServerWrapperImpl implements 
MetricsHBaseServerWrapper
     return server.getScheduler().getActiveReplicationRpcHandlerCount();
   }
 
+  @Override
+  public int getActiveBulkLoadRpcHandlerCount() {
+    if (!isServerStarted() || this.server.getScheduler() == null) {
+      return 0;
+    }
+    return server.getScheduler().getActiveBulkLoadRpcHandlerCount();
+  }
+
   @Override
   public long getNumGeneralCallsDropped() {
     if (!isServerStarted() || this.server.getScheduler() == null) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
index d81b3224901..f73590a96c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
@@ -33,6 +33,10 @@ public abstract class RpcScheduler {
     "hbase.ipc.server.max.callqueue.length";
   public static final String IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH =
     "hbase.ipc.server.priority.max.callqueue.length";
+  public static final String IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH =
+    "hbase.ipc.server.replication.max.callqueue.length";
+  public static final String IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH =
+    "hbase.ipc.server.bulkload.max.callqueue.length";
 
   /** Exposes runtime information of a {@code RpcServer} that a {@code 
RpcScheduler} may need. */
   public static abstract class Context {
@@ -76,6 +80,9 @@ public abstract class RpcScheduler {
   /** Retrieves length of the replication queue for metrics. */
   public abstract int getReplicationQueueLength();
 
+  /** Retrieves length of the bulkload queue for metrics. */
+  public abstract int getBulkLoadQueueLength();
+
   /** Retrieves the total number of active handler. */
   public abstract int getActiveRpcHandlerCount();
 
@@ -91,6 +98,9 @@ public abstract class RpcScheduler {
   /** Retrieves the number of active replication handler. */
   public abstract int getActiveReplicationRpcHandlerCount();
 
+  /** Retrieves the number of active bulkload handler. */
+  public abstract int getActiveBulkLoadRpcHandlerCount();
+
   /**
    * If CoDel-based RPC executors are used, retrieves the number of Calls that 
were dropped from
    * general queue because RPC executor is under high load; returns 0 
otherwise.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index bb8025ca911..f592633e933 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -49,6 +49,8 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
    */
   private final RpcExecutor metaTransitionExecutor;
 
+  private final RpcExecutor bulkloadExecutor;
+
   /** What level a high priority call is at. */
   private final int highPriorityLevel;
 
@@ -63,11 +65,17 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
   public SimpleRpcScheduler(Configuration conf, int handlerCount, int 
priorityHandlerCount,
     int replicationHandlerCount, int metaTransitionHandler, PriorityFunction 
priority,
     Abortable server, int highPriorityLevel) {
-
+    int bulkLoadHandlerCount = 
conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT,
+      HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT);
     int maxQueueLength = 
conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
       handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
-    int maxPriorityQueueLength =
-      conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, 
maxQueueLength);
+    int maxPriorityQueueLength = 
conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
+      priorityHandlerCount * 
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+    int maxReplicationQueueLength =
+      conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
+        replicationHandlerCount * 
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+    int maxBulkLoadQueueLength = 
conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH,
+      bulkLoadHandlerCount * 
RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
 
     this.priority = priority;
     this.highPriorityLevel = highPriorityLevel;
@@ -119,6 +127,11 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
         RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, 
priority, conf,
         abortable)
       : null;
+    this.bulkloadExecutor = bulkLoadHandlerCount > 0
+      ? new FastPathBalancedQueueRpcExecutor("bulkLoad.FPBQ", 
bulkLoadHandlerCount,
+        RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxBulkLoadQueueLength, 
priority, conf,
+        abortable)
+      : null;
   }
 
   public SimpleRpcScheduler(Configuration conf, int handlerCount, int 
priorityHandlerCount,
@@ -170,6 +183,9 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
     if (metaTransitionExecutor != null) {
       metaTransitionExecutor.start(port);
     }
+    if (bulkloadExecutor != null) {
+      bulkloadExecutor.start(port);
+    }
 
   }
 
@@ -185,6 +201,9 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
     if (metaTransitionExecutor != null) {
       metaTransitionExecutor.stop();
     }
+    if (bulkloadExecutor != null) {
+      bulkloadExecutor.stop();
+    }
 
   }
 
@@ -205,6 +224,8 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
       return priorityExecutor.dispatch(callTask);
     } else if (replicationExecutor != null && level == 
HConstants.REPLICATION_QOS) {
       return replicationExecutor.dispatch(callTask);
+    } else if (bulkloadExecutor != null && level == HConstants.BULKLOAD_QOS) {
+      return bulkloadExecutor.dispatch(callTask);
     } else {
       return callExecutor.dispatch(callTask);
     }
@@ -230,10 +251,16 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
     return replicationExecutor == null ? 0 : 
replicationExecutor.getQueueLength();
   }
 
+  @Override
+  public int getBulkLoadQueueLength() {
+    return bulkloadExecutor == null ? 0 : bulkloadExecutor.getQueueLength();
+  }
+
   @Override
   public int getActiveRpcHandlerCount() {
     return callExecutor.getActiveHandlerCount() + 
getActivePriorityRpcHandlerCount()
-      + getActiveReplicationRpcHandlerCount() + 
getActiveMetaPriorityRpcHandlerCount();
+      + getActiveReplicationRpcHandlerCount() + 
getActiveMetaPriorityRpcHandlerCount()
+      + getActiveBulkLoadRpcHandlerCount();
   }
 
   @Override
@@ -256,6 +283,11 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
     return (replicationExecutor == null ? 0 : 
replicationExecutor.getActiveHandlerCount());
   }
 
+  @Override
+  public int getActiveBulkLoadRpcHandlerCount() {
+    return bulkloadExecutor == null ? 0 : 
bulkloadExecutor.getActiveHandlerCount();
+  }
+
   @Override
   public long getNumGeneralCallsDropped() {
     return callExecutor.getNumGeneralCallsDropped();
@@ -327,6 +359,12 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
       callQueueInfo.setCallMethodSize(queueName, 
metaTransitionExecutor.getCallQueueSizeSummary());
     }
 
+    if (null != bulkloadExecutor) {
+      queueName = "BulkLoad Queue";
+      callQueueInfo.setCallMethodCount(queueName, 
bulkloadExecutor.getCallQueueCountsSummary());
+      callQueueInfo.setCallMethodSize(queueName, 
bulkloadExecutor.getCallQueueSizeSummary());
+    }
+
     return callQueueInfo;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
index cec2f483f8f..5b1fed0880b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
@@ -37,6 +37,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegi
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
@@ -183,6 +184,9 @@ public class AnnotationReadingPriorityFunction implements 
PriorityFunction {
     if (header.hasPriority()) {
       return header.getPriority();
     }
+    if (param instanceof BulkLoadHFileRequest) {
+      return HConstants.BULKLOAD_QOS;
+    }
 
     String cls = param.getClass().getName();
     Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
index 7d2836e7c6c..f8ac4a1bb9a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
@@ -44,6 +44,11 @@ public class DelegatingRpcScheduler extends RpcScheduler {
     return delegate.getReplicationQueueLength();
   }
 
+  @Override
+  public int getBulkLoadQueueLength() {
+    return delegate.getBulkLoadQueueLength();
+  }
+
   @Override
   public int getPriorityQueueLength() {
     return delegate.getPriorityQueueLength();
@@ -74,6 +79,11 @@ public class DelegatingRpcScheduler extends RpcScheduler {
     return delegate.getActiveReplicationRpcHandlerCount();
   }
 
+  @Override
+  public int getActiveBulkLoadRpcHandlerCount() {
+    return delegate.getActiveBulkLoadRpcHandlerCount();
+  }
+
   @Override
   public boolean dispatch(CallRunner task) {
     return delegate.dispatch(task);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
index f525c027a40..6e5dfe87fc7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
@@ -33,6 +33,11 @@ public class MetricsHBaseServerWrapperStub implements 
MetricsHBaseServerWrapper
     return 103;
   }
 
+  @Override
+  public int getBulkLoadQueueLength() {
+    return 109;
+  }
+
   @Override
   public int getPriorityQueueLength() {
     return 104;
@@ -63,6 +68,11 @@ public class MetricsHBaseServerWrapperStub implements 
MetricsHBaseServerWrapper
     return 203;
   }
 
+  @Override
+  public int getActiveBulkLoadRpcHandlerCount() {
+    return 204;
+  }
+
   @Override
   public long getNumGeneralCallsDropped() {
     return 3;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 79bc97fb07c..0c629231728 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -480,12 +480,21 @@ public class TestSimpleRpcScheduler {
       when(scanCall.getHeader()).thenReturn(scanHead);
       when(scanCall.getParam()).thenReturn(scanCall.param);
 
+      CallRunner bulkLoadCallTask = mock(CallRunner.class);
+      ServerCall bulkLoadCall = mock(ServerCall.class);
+      bulkLoadCall.param = ScanRequest.newBuilder().build();
+      RequestHeader bulkLadHead = 
RequestHeader.newBuilder().setMethodName("bulkload").build();
+      when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall);
+      when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead);
+      when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param);
+
       ArrayList<Integer> work = new ArrayList<>();
       doAnswerTaskExecution(putCallTask, work, 1, 1000);
       doAnswerTaskExecution(getCallTask, work, 2, 1000);
       doAnswerTaskExecution(scanCallTask, work, 3, 1000);
+      doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000);
 
-      // There are 3 queues: [puts], [gets], [scans]
+      // There are 3 queues: [puts], [gets], [scans], [bulkload]
       // so the calls will be interleaved
       scheduler.dispatch(putCallTask);
       scheduler.dispatch(putCallTask);
@@ -496,7 +505,9 @@ public class TestSimpleRpcScheduler {
       scheduler.dispatch(scanCallTask);
       scheduler.dispatch(scanCallTask);
       scheduler.dispatch(scanCallTask);
-
+      scheduler.dispatch(bulkLoadCallTask);
+      scheduler.dispatch(bulkLoadCallTask);
+      scheduler.dispatch(bulkLoadCallTask);
       while (work.size() < 6) {
         Thread.sleep(100);
       }

Reply via email to