HBASE-17262 Refactor RpcServer so as to make it extendable and/or pluggable
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fc93de51 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fc93de51 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fc93de51 Branch: refs/heads/hbase-12439 Commit: fc93de51aff2c917a2b89694cf16ca37ccde6723 Parents: d787155 Author: binlijin <[email protected]> Authored: Thu Dec 22 14:49:56 2016 +0800 Committer: binlijin <[email protected]> Committed: Thu Dec 22 14:49:56 2016 +0800 ---------------------------------------------------------------------- .../hbase/ipc/IntegrationTestRpcClient.java | 53 +- .../org/apache/hadoop/hbase/ipc/CallRunner.java | 3 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 2127 ++---------------- .../hadoop/hbase/ipc/RpcServerFactory.java | 58 + .../hadoop/hbase/ipc/SimpleRpcServer.java | 1997 ++++++++++++++++ .../hbase/regionserver/RSRpcServices.java | 3 +- .../hadoop/hbase/ipc/AbstractTestIPC.java | 85 +- .../hadoop/hbase/ipc/TestProtoBufRpc.java | 2 +- .../hbase/ipc/TestRpcHandlerException.java | 19 +- .../hadoop/hbase/security/TestSecureIPC.java | 3 +- .../security/token/TestTokenAuthentication.java | 11 +- 11 files changed, 2283 insertions(+), 2078 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fc93de51/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index 7ce86bd..219a4e0 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -25,11 +25,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import com.google.common.collect.Lists; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; - import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -45,20 +40,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.testclassification.IntegrationTests; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Lists; + @Category(IntegrationTests.class) public class IntegrationTestRpcClient { @@ -72,26 +67,6 @@ public class IntegrationTestRpcClient { conf = HBaseConfiguration.create(); } - static class TestRpcServer extends RpcServer { - - TestRpcServer(Configuration conf) throws IOException { - this(new FifoRpcScheduler(conf, 1), conf); - } - - TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException { - super(null, "testRpcServer", Lists - .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress( - "localhost", 0), conf, scheduler); - } - - @Override - public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) - throws IOException { - return super.call(service, md, param, cellScanner, receiveTime, status); - } - } - protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) { return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) { @Override @@ -116,8 +91,8 @@ public class IntegrationTestRpcClient { class Cluster { Random random = new Random(); ReadWriteLock lock = new ReentrantReadWriteLock(); - HashMap<InetSocketAddress, TestRpcServer> rpcServers = new HashMap<>(); - List<TestRpcServer> serverList = new ArrayList<>(); + HashMap<InetSocketAddress, RpcServer> rpcServers = new HashMap<>(); + List<RpcServer> serverList = new ArrayList<>(); int maxServers; int minServers; @@ -126,14 +101,18 @@ public class IntegrationTestRpcClient { this.maxServers = maxServers; } - TestRpcServer startServer() throws IOException { + RpcServer startServer() throws IOException { lock.writeLock().lock(); try { if (rpcServers.size() >= maxServers) { return null; } - TestRpcServer rpcServer = new TestRpcServer(conf); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, + "testRpcServer", Lists + .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler( + conf, 1)); rpcServer.start(); InetSocketAddress address = rpcServer.getListenerAddress(); if (address == null) { @@ -150,7 +129,7 @@ public class IntegrationTestRpcClient { void stopRandomServer() throws Exception { lock.writeLock().lock(); - TestRpcServer rpcServer = null; + RpcServer rpcServer = null; try { if (rpcServers.size() <= minServers) { return; @@ -174,7 +153,7 @@ public class IntegrationTestRpcClient { } } - void stopServer(TestRpcServer rpcServer) throws InterruptedException { + void stopServer(RpcServer rpcServer) throws InterruptedException { InetSocketAddress address = rpcServer.getListenerAddress(); LOG.info("Stopping server: " + address); rpcServer.stop(); @@ -185,7 +164,7 @@ public class IntegrationTestRpcClient { void stopRunning() throws InterruptedException { lock.writeLock().lock(); try { - for (TestRpcServer rpcServer : serverList) { + for (RpcServer rpcServer : serverList) { stopServer(rpcServer); } @@ -194,7 +173,7 @@ public class IntegrationTestRpcClient { } } - TestRpcServer getRandomServer() { + RpcServer getRandomServer() { lock.readLock().lock(); try { int size = rpcServers.size(); @@ -278,7 +257,7 @@ public class IntegrationTestRpcClient { String message = isBigPayload ? BIG_PAYLOAD : id + numCalls; EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); EchoResponseProto ret; - TestRpcServer server = cluster.getRandomServer(); + RpcServer server = cluster.getRandomServer(); try { sending.set(true); BlockingInterface stub = newBlockingStub(rpcClient, server.getListenerAddress()); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc93de51/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 5301a67..0aabc10 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -1,4 +1,3 @@ -package org.apache.hadoop.hbase.ipc; /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -16,6 +15,8 @@ package org.apache.hadoop.hbase.ipc; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.hadoop.hbase.ipc; + import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException;
