Repository: hbase Updated Branches: refs/heads/master 48d387413 -> d921262d3
Revert "HBASE-20965 Separate region server report requests to new handlers" This reverts commit 48d387413f012cd6bfecc42085f7432647975780. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d921262d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d921262d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d921262d Branch: refs/heads/master Commit: d921262d389ec7178797b817f0cd8a0a873085ac Parents: 48d3874 Author: Reid Chan <reidc...@apache.org> Authored: Wed Aug 8 10:44:50 2018 +0800 Committer: Reid Chan <reidc...@apache.org> Committed: Wed Aug 8 10:44:50 2018 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/HConstants.java | 2 - .../hadoop/hbase/ipc/FifoRpcScheduler.java | 36 ++-- .../hbase/ipc/MasterFifoRpcScheduler.java | 108 ------------ .../hadoop/hbase/master/MasterRpcServices.java | 10 -- .../MasterFifoRpcSchedulerFactory.java | 46 ----- .../hbase/regionserver/RSRpcServices.java | 20 +-- .../hbase/ipc/TestMasterFifoRpcScheduler.java | 168 ------------------- 7 files changed, 14 insertions(+), 376 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d921262d/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 436426f..beb65fa 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1022,8 +1022,6 @@ public final class HConstants { public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count"; public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 30; - public static final String REGION_SERVER_REPORT_HANDLER_COUNT = - "hbase.regionserver.report.handler.count"; /* * REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT: http://git-wip-us.apache.org/repos/asf/hbase/blob/d921262d/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index a3fa684..bd8bdce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -40,10 +39,10 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; @InterfaceAudience.Private public class FifoRpcScheduler extends RpcScheduler { private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class); - protected final int handlerCount; - protected final int maxQueueLength; - protected final AtomicInteger queueSize = new AtomicInteger(0); - protected ThreadPoolExecutor executor; + private final int handlerCount; + private final int maxQueueLength; + private final AtomicInteger queueSize = new AtomicInteger(0); + private ThreadPoolExecutor executor; public FifoRpcScheduler(Configuration conf, int handlerCount) { this.handlerCount = handlerCount; @@ -95,11 +94,6 @@ public class FifoRpcScheduler extends RpcScheduler { @Override public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { - return executeRpcCall(executor, queueSize, task); - } - - protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize, - final CallRunner task) { // Executors provide no offer, so make our own. int queued = queueSize.getAndIncrement(); if (maxQueueLength > 0 && queued >= maxQueueLength) { @@ -205,19 +199,15 @@ public class FifoRpcScheduler extends RpcScheduler { callQueueInfo.setCallMethodCount(queueName, methodCount); callQueueInfo.setCallMethodSize(queueName, methodSize); - updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize); - - return callQueueInfo; - } - protected void updateMethodCountAndSizeByQueue(BlockingQueue<Runnable> queue, - HashMap<String, Long> methodCount, HashMap<String, Long> methodSize) { - for (Runnable r : queue) { + for (Runnable r:executor.getQueue()) { FifoCallRunner mcr = (FifoCallRunner) r; RpcCall rpcCall = mcr.getCallRunner().getRpcCall(); - String method = getCallMethod(mcr.getCallRunner()); - if (StringUtil.isNullOrEmpty(method)) { + String method; + + if (null==rpcCall.getMethod() || + StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { method = "Unknown"; } @@ -226,13 +216,7 @@ public class FifoRpcScheduler extends RpcScheduler { methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L)); methodSize.put(method, size + methodSize.getOrDefault(method, 0L)); } - } - protected String getCallMethod(final CallRunner task) { - RpcCall call = task.getRpcCall(); - if (call != null && call.getMethod() != null) { - return call.getMethod().getName(); - } - return null; + return callQueueInfo; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d921262d/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java deleted file mode 100644 index 01a8dcf..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.io.IOException; -import java.util.HashMap; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DaemonThreadFactory; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Special rpc scheduler only used for master. - */ -@InterfaceAudience.Private -public class MasterFifoRpcScheduler extends FifoRpcScheduler { - private static final Logger LOG = LoggerFactory.getLogger(MasterFifoRpcScheduler.class); - - private static final String REGION_SERVER_REPORT = "RegionServerReport"; - private final int rsReportHandlerCount; - private final int rsRsreportMaxQueueLength; - private final AtomicInteger rsReportQueueSize = new AtomicInteger(0); - private ThreadPoolExecutor rsReportExecutor; - - public MasterFifoRpcScheduler(Configuration conf, int callHandlerCount, - int rsReportHandlerCount) { - super(conf, callHandlerCount); - this.rsReportHandlerCount = rsReportHandlerCount; - this.rsRsreportMaxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, - rsReportHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); - } - - @Override - public void start() { - this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, - new ArrayBlockingQueue<Runnable>(maxQueueLength), - new DaemonThreadFactory("MasterFifoRpcScheduler.call.handler"), - new ThreadPoolExecutor.CallerRunsPolicy()); - this.rsReportExecutor = new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60, - TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(rsRsreportMaxQueueLength), - new DaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"), - new ThreadPoolExecutor.CallerRunsPolicy()); - } - - @Override - public void stop() { - this.executor.shutdown(); - this.rsReportExecutor.shutdown(); - } - - @Override - public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { - String method = getCallMethod(task); - if (rsReportExecutor != null && method != null && method.equals(REGION_SERVER_REPORT)) { - return executeRpcCall(rsReportExecutor, rsReportQueueSize, task); - } else { - return executeRpcCall(executor, queueSize, task); - } - } - - @Override - public int getGeneralQueueLength() { - return executor.getQueue().size() + rsReportExecutor.getQueue().size(); - } - - @Override - public int getActiveRpcHandlerCount() { - return executor.getActiveCount() + rsReportExecutor.getActiveCount(); - } - - @Override - public CallQueueInfo getCallQueueInfo() { - String queueName = "Master Fifo Queue"; - - HashMap<String, Long> methodCount = new HashMap<>(); - HashMap<String, Long> methodSize = new HashMap<>(); - - CallQueueInfo callQueueInfo = new CallQueueInfo(); - callQueueInfo.setCallMethodCount(queueName, methodCount); - callQueueInfo.setCallMethodSize(queueName, methodSize); - - updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize); - updateMethodCountAndSizeByQueue(rsReportExecutor.getQueue(), methodCount, methodSize); - - return callQueueInfo; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/d921262d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 9ebbd3c..a4d9ff8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -336,16 +336,6 @@ public class MasterRpcServices extends RSRpcServices } @Override - protected Class<?> getRpcSchedulerFactoryClass() { - Configuration conf = getConfiguration(); - if (conf != null) { - return conf.getClass(MASTER_RPC_SCHEDULER_FACTORY_CLASS, super.getRpcSchedulerFactoryClass()); - } else { - return super.getRpcSchedulerFactoryClass(); - } - } - - @Override protected RpcServerInterface createRpcServer(Server server, Configuration conf, RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) throws IOException { http://git-wip-us.apache.org/repos/asf/hbase/blob/d921262d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java deleted file mode 100644 index 1e0a4e8..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ipc.MasterFifoRpcScheduler; -import org.apache.hadoop.hbase.ipc.PriorityFunction; -import org.apache.hadoop.hbase.ipc.RpcScheduler; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; - -/** - * Factory to use when you want to use the {@link MasterFifoRpcScheduler} - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class MasterFifoRpcSchedulerFactory extends FifoRpcSchedulerFactory { - @Override - public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { - int totalHandlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, - HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); - int rsReportHandlerCount = conf.getInt(HConstants.REGION_SERVER_REPORT_HANDLER_COUNT, 0); - if (rsReportHandlerCount == 0) { - rsReportHandlerCount = Math.max(1, totalHandlerCount / 2); - } - int callHandlerCount = Math.max(1, totalHandlerCount - rsReportHandlerCount); - return new MasterFifoRpcScheduler(conf, callHandlerCount, rsReportHandlerCount); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/d921262d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index cb97d35..e292ce1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -94,7 +94,6 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcCallback; -import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerFactory; @@ -258,10 +257,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = "hbase.region.server.rpc.scheduler.factory.class"; - /** RPC scheduler to use for the master. */ - public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS = - "hbase.master.rpc.scheduler.factory.class"; - /** * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This * configuration exists to prevent the scenario where a time limit is specified to be so @@ -1208,7 +1203,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); RpcSchedulerFactory rpcSchedulerFactory; try { - rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) + Class<?> cls = rs.conf.getClass( + REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + SimpleRpcSchedulerFactory.class); + rpcSchedulerFactory = cls.asSubclass(RpcSchedulerFactory.class) .getDeclaredConstructor().newInstance(); } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { @@ -1285,11 +1283,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - protected Class<?> getRpcSchedulerFactoryClass() { - return this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, - SimpleRpcSchedulerFactory.class); - } - @Override public void onConfigurationChange(Configuration newConf) { if (rpcServer instanceof ConfigurationObserver) { @@ -3707,9 +3700,4 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throw new ServiceException(e); } } - - @VisibleForTesting - public RpcScheduler getRpcScheduler() { - return rpcServer.getScheduler(); - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d921262d/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java deleted file mode 100644 index 04eb82d..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.MasterRpcServices; -import org.apache.hadoop.hbase.regionserver.RSRpcServices; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.RPCTests; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; - -@Category({ RPCTests.class, LargeTests.class }) -public class TestMasterFifoRpcScheduler { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestMasterFifoRpcScheduler.class); - - private static final Logger LOG = LoggerFactory.getLogger(TestMasterFifoRpcScheduler.class); - - private static final String REGION_SERVER_REPORT = "RegionServerReport"; - private static final String OTHER = "Other"; - private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - @BeforeClass - public static void setupBeforeClass() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - conf.set(RSRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS, - "org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory"); - conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5); - conf.setInt(HConstants.REGION_SERVER_REPORT_HANDLER_COUNT, 2); - TEST_UTIL.startMiniCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Test - public void testMasterRpcScheduler() { - HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); - MasterRpcServices masterRpcServices = master.getMasterRpcServices(); - RpcScheduler masterRpcScheduler = masterRpcServices.getRpcScheduler(); - Assert.assertTrue(masterRpcScheduler instanceof MasterFifoRpcScheduler); - } - - @Test - public void testCallQueueInfo() throws Exception { - Configuration conf = HBaseConfiguration.create(); - AtomicInteger callExecutionCount = new AtomicInteger(0); - - RpcScheduler scheduler = new MockMasterFifoRpcScheduler(conf, 2, 1); - scheduler.start(); - - int totalCallMethods = 30; - int unableToDispatch = 0; - - for (int i = totalCallMethods; i > 0; i--) { - CallRunner task = createMockTask(callExecutionCount, i < 20); - if (!scheduler.dispatch(task)) { - unableToDispatch++; - } - Thread.sleep(10); - } - - CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); - int executionCount = callExecutionCount.get(); - - String expectedQueueName = "Master Fifo Queue"; - assertEquals(1, callQueueInfo.getCallQueueNames().size()); - - long callQueueSize = 0; - for (String queueName : callQueueInfo.getCallQueueNames()) { - assertEquals(expectedQueueName, queueName); - Set<String> methodNames = callQueueInfo.getCalledMethodNames(queueName); - if (methodNames.size() == 2) { - assertTrue(methodNames.contains(REGION_SERVER_REPORT)); - assertTrue(methodNames.contains(OTHER)); - } - for (String methodName : callQueueInfo.getCalledMethodNames(queueName)) { - callQueueSize += callQueueInfo.getCallMethodCount(queueName, methodName); - } - } - - assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount); - scheduler.stop(); - } - - private CallRunner createMockTask(AtomicInteger callExecutionCount, - boolean isRegionServerReportTask) { - CallRunner task = mock(CallRunner.class); - ServerCall call = mock(ServerCall.class); - when(task.getRpcCall()).thenReturn(call); - when(call.getHeader()).thenReturn(RPCProtos.RequestHeader.newBuilder() - .setMethodName(isRegionServerReportTask ? REGION_SERVER_REPORT : OTHER).build()); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - callExecutionCount.incrementAndGet(); - Thread.sleep(1000); - return null; - } - }).when(task).run(); - - return task; - } - - private static class MockMasterFifoRpcScheduler extends MasterFifoRpcScheduler { - - public MockMasterFifoRpcScheduler(Configuration conf, int callHandlerCount, - int rsReportHandlerCount) { - super(conf, callHandlerCount, rsReportHandlerCount); - } - - /** - * Override this method because we can't mock a Descriptors.MethodDescriptor - */ - @Override - protected String getCallMethod(final CallRunner task) { - RpcCall call = task.getRpcCall(); - if (call.getHeader() != null) { - return call.getHeader().getMethodName(); - } - return null; - } - } -}