http://git-wip-us.apache.org/repos/asf/hbase/blob/fc93de51/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 5a9178a..a1a73c1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -46,7 +46,9 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -55,7 +57,6 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseReq import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; @@ -77,27 +78,6 @@ public abstract class AbstractTestIPC { static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES); static final Configuration CONF = HBaseConfiguration.create(); - /** - * Instance of server. We actually don't do anything speical in here so could just use - * HBaseRpcServer directly. - */ - static class TestRpcServer extends RpcServer { - - TestRpcServer() throws IOException { - this(new FifoRpcScheduler(CONF, 1), CONF); - } - - TestRpcServer(Configuration conf) throws IOException { - this(new FifoRpcScheduler(conf, 1), conf); - } - - TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException { - super(null, "testRpcServer", - Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("localhost", 0), conf, scheduler); - } - } - protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf); /** @@ -106,7 +86,10 @@ public abstract class AbstractTestIPC { @Test public void testNoCodec() throws IOException, ServiceException { Configuration conf = HBaseConfiguration.create(); - TestRpcServer rpcServer = new TestRpcServer(); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, + "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); @@ -136,7 +119,10 @@ public abstract class AbstractTestIPC { for (int i = 0; i < count; i++) { cells.add(CELL); } - TestRpcServer rpcServer = new TestRpcServer(); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, + "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient<?> client = createRpcClient(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); @@ -163,7 +149,10 @@ public abstract class AbstractTestIPC { @Test public void testRTEDuringConnectionSetup() throws Exception { Configuration conf = HBaseConfiguration.create(); - TestRpcServer rpcServer = new TestRpcServer(); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, + "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); @@ -183,7 +172,10 @@ public abstract class AbstractTestIPC { @Test public void testRpcScheduler() throws IOException, ServiceException, InterruptedException { RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); - RpcServer rpcServer = new TestRpcServer(scheduler, CONF); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, + "testRpcServer", + Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, scheduler); verify(scheduler).init((RpcScheduler.Context) anyObject()); try (AbstractRpcClient<?> client = createRpcClient(CONF)) { rpcServer.start(); @@ -205,7 +197,10 @@ public abstract class AbstractTestIPC { public void testRpcMaxRequestSize() throws IOException, ServiceException { Configuration conf = new Configuration(CONF); conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000); - RpcServer rpcServer = new TestRpcServer(conf); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, + "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), conf, + new FifoRpcScheduler(conf, 1)); try (AbstractRpcClient<?> client = createRpcClient(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); @@ -236,7 +231,10 @@ public abstract class AbstractTestIPC { @Test public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException, ServiceException { - TestRpcServer rpcServer = new TestRpcServer(); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, + "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); try (AbstractRpcClient<?> client = createRpcClient(CONF)) { rpcServer.start(); @@ -250,7 +248,10 @@ public abstract class AbstractTestIPC { @Test public void testRemoteError() throws IOException, ServiceException { - TestRpcServer rpcServer = new TestRpcServer(); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, + "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient<?> client = createRpcClient(CONF)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); @@ -267,7 +268,10 @@ public abstract class AbstractTestIPC { @Test public void testTimeout() throws IOException { - TestRpcServer rpcServer = new TestRpcServer(); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, + "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient<?> client = createRpcClient(CONF)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); @@ -295,7 +299,7 @@ public abstract class AbstractTestIPC { } } - static class TestFailingRpcServer extends TestRpcServer { + static class TestFailingRpcServer extends SimpleRpcServer { TestFailingRpcServer() throws IOException { this(new FifoRpcScheduler(CONF, 1), CONF); @@ -306,7 +310,9 @@ public abstract class AbstractTestIPC { } TestFailingRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException { - super(scheduler, conf); + super(null, "testRpcServer", Lists + .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), conf, scheduler); } class FailingConnection extends Connection { @@ -349,7 +355,10 @@ public abstract class AbstractTestIPC { @Test public void testAsyncEcho() throws IOException { Configuration conf = HBaseConfiguration.create(); - TestRpcServer rpcServer = new TestRpcServer(); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, + "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient<?> client = createRpcClient(conf)) { rpcServer.start(); Interface stub = newStub(client, rpcServer.getListenerAddress()); @@ -377,7 +386,10 @@ public abstract class AbstractTestIPC { @Test public void testAsyncRemoteError() throws IOException { AbstractRpcClient<?> client = createRpcClient(CONF); - TestRpcServer rpcServer = new TestRpcServer(); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, + "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); try { rpcServer.start(); Interface stub = newStub(client, rpcServer.getListenerAddress()); @@ -398,7 +410,10 @@ public abstract class AbstractTestIPC { @Test public void testAsyncTimeout() throws IOException { - TestRpcServer rpcServer = new TestRpcServer(); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, + "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient<?> client = createRpcClient(CONF)) { rpcServer.start(); Interface stub = newStub(client, rpcServer.getListenerAddress());
http://git-wip-us.apache.org/repos/asf/hbase/blob/fc93de51/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index 9a02d5b..b039003 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -65,7 +65,7 @@ public class TestProtoBufRpc { log.setLevel(Level.TRACE); // Create server side implementation // Get RPC server for server side implementation - this.server = new RpcServer(null, "testrpc", + this.server = RpcServerFactory.createRpcServer(null, "testrpc", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10)); InetSocketAddress address = server.getListenerAddress(); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc93de51/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java index 8eed01c..449899f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -27,6 +27,7 @@ import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.testclassification.RPCTests; @@ -36,6 +37,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import com.google.common.collect.Lists; + import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; @Category({ RPCTests.class, SmallTests.class }) @@ -44,19 +46,6 @@ public class TestRpcHandlerException { private final static Configuration CONF = HBaseConfiguration.create(); /** - * Instance of server. We actually don't do anything speical in here so could just use - * HBaseRpcServer directly. - */ - private static class TestRpcServer extends RpcServer { - - TestRpcServer(RpcScheduler scheduler) throws IOException { - super(null, "testRpcServer", - Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)), - new InetSocketAddress("localhost", 0), CONF, scheduler); - } - } - - /** * Tests that the rpc scheduler is called when requests arrive. When Rpc handler thread dies, the * client will hang and the test will fail. The test is meant to be a unit test to test the * behavior. @@ -85,7 +74,9 @@ public class TestRpcHandlerException { PriorityFunction qosFunction = mock(PriorityFunction.class); Abortable abortable = new AbortServer(); RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0); - RpcServer rpcServer = new TestRpcServer(scheduler); + RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer", + Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, scheduler); try (BlockingRpcClient client = new BlockingRpcClient(CONF)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc93de51/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java index b7d6f87..c848250 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServerInterface; +import org.apache.hadoop.hbase.ipc.RpcServerFactory; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.testclassification.SecurityTests; @@ -250,7 +251,7 @@ public class TestSecureIPC { InetSocketAddress isa = new InetSocketAddress(HOST, 0); - RpcServerInterface rpcServer = new RpcServer(null, "AbstractTestSecureIPC", + RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)), isa, serverConf, new FifoRpcScheduler(serverConf, 1)); rpcServer.start(); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc93de51/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index b7517bf0..92eaecc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.io.InterruptedIOException; import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -40,7 +39,6 @@ import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -51,14 +49,12 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; -import org.apache.hadoop.hbase.ipc.RpcClient; -import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.ipc.RpcServerFactory; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; -import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.WhoAmIResponse; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.security.SecurityInfo; @@ -78,7 +74,6 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.net.DNS; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.token.SecretManager; @@ -188,8 +183,8 @@ public class TestTokenAuthentication { }; sai.add(new BlockingServiceAndInterface(proxy, AuthenticationProtos.AuthenticationService.BlockingInterface.class)); - this.rpcServer = - new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1)); + this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai, + initialIsa, conf, new FifoRpcScheduler(conf, 1)); InetSocketAddress address = rpcServer.getListenerAddress(); if (address == null) { throw new IOException("Listener channel is closed");
