Repository: hbase Updated Branches: refs/heads/master c536c8511 -> 45af3831f
http://git-wip-us.apache.org/repos/asf/hbase/blob/45af3831/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java index 7efe198..565f5bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors.MethodDescriptor; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandlerAdapter; @@ -27,33 +25,12 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; -import java.io.IOException; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.experimental.categories.Category; @@ -65,8 +42,6 @@ import org.junit.runners.Parameterized.Parameters; @Category({ RPCTests.class, SmallTests.class }) public class TestAsyncIPC extends AbstractTestIPC { - private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class); - @Parameters public static Collection<Object[]> parameters() { List<Object[]> paramList = new ArrayList<>(); @@ -92,8 +67,8 @@ public class TestAsyncIPC extends AbstractTestIPC { if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) { if (useNativeTransport && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup) - || (!useNativeTransport - && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof NioEventLoopGroup))) { + || (!useNativeTransport && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP + .getFirst() instanceof NioEventLoopGroup))) { AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully(); AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null; } @@ -123,80 +98,16 @@ public class TestAsyncIPC extends AbstractTestIPC { protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) { setConf(conf); return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - promise.setFailure(new RuntimeException("Injected fault")); - } - }); + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + promise.setFailure(new RuntimeException("Injected fault")); } }); - } - - public static void main(String[] args) throws IOException, SecurityException, - NoSuchMethodException, InterruptedException { - if (args.length != 2) { - System.out.println("Usage: TestAsyncIPC <CYCLES> <CELLS_PER_CYCLE>"); - return; - } - // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO); - // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO); - int cycles = Integer.parseInt(args[0]); - int cellcount = Integer.parseInt(args[1]); - Configuration conf = HBaseConfiguration.create(); - TestRpcServer rpcServer = new TestRpcServer(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - KeyValue kv = BIG_CELL; - Put p = new Put(CellUtil.cloneRow(kv)); - for (int i = 0; i < cellcount; i++) { - p.add(kv); - } - RowMutations rm = new RowMutations(CellUtil.cloneRow(kv)); - rm.add(p); - try (AsyncRpcClient client = new AsyncRpcClient(conf)) { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); } - long startTime = System.currentTimeMillis(); - User user = User.getCurrent(); - for (int i = 0; i < cycles; i++) { - List<CellScannable> cells = new ArrayList<>(); - // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); - ClientProtos.RegionAction.Builder builder = - RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells, - RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), - MutationProto.newBuilder()); - builder.setRegion(RegionSpecifier - .newBuilder() - .setType(RegionSpecifierType.REGION_NAME) - .setValue( - ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); - if (i % 100000 == 0) { - LOG.info("" + i); - // Uncomment this for a thread dump every so often. - // ReflectionUtils.printThreadInfo(new PrintWriter(System.out), - // "Thread dump " + Thread.currentThread().getName()); - } - PayloadCarryingRpcController pcrc = - new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); - // Pair<Message, CellScanner> response = - client.call(pcrc, md, builder.build(), param, user, address, - new MetricsConnection.CallStats()); - /* - * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), - * count); - */ - } - LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " - + (System.currentTimeMillis() - startTime) + "ms"); - } finally { - rpcServer.stop(); - } + }); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/45af3831/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 56de07d..b88cb7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -22,37 +22,14 @@ import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors.MethodDescriptor; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.Socket; -import java.util.ArrayList; -import java.util.List; import javax.net.SocketFactory; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.net.NetUtils; @@ -64,8 +41,6 @@ import org.mockito.stubbing.Answer; @Category({ RPCTests.class, SmallTests.class }) public class TestIPC extends AbstractTestIPC { - private static final Log LOG = LogFactory.getLog(TestIPC.class); - @Override protected RpcClientImpl createRpcClientNoCodec(Configuration conf) { return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) { @@ -96,71 +71,4 @@ public class TestIPC extends AbstractTestIPC { return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory); } - - public static void main(String[] args) throws IOException, SecurityException, - NoSuchMethodException, InterruptedException { - if (args.length != 2) { - System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>"); - return; - } - // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO); - // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO); - int cycles = Integer.parseInt(args[0]); - int cellcount = Integer.parseInt(args[1]); - Configuration conf = HBaseConfiguration.create(); - TestRpcServer rpcServer = new TestRpcServer(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT); - KeyValue kv = BIG_CELL; - Put p = new Put(CellUtil.cloneRow(kv)); - for (int i = 0; i < cellcount; i++) { - p.add(kv); - } - RowMutations rm = new RowMutations(CellUtil.cloneRow(kv)); - rm.add(p); - try { - rpcServer.start(); - long startTime = System.currentTimeMillis(); - User user = User.getCurrent(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - for (int i = 0; i < cycles; i++) { - List<CellScannable> cells = new ArrayList<>(); - // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); - ClientProtos.RegionAction.Builder builder = - RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells, - RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), - MutationProto.newBuilder()); - builder.setRegion(RegionSpecifier - .newBuilder() - .setType(RegionSpecifierType.REGION_NAME) - .setValue( - ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); - if (i % 100000 == 0) { - LOG.info("" + i); - // Uncomment this for a thread dump every so often. - // ReflectionUtils.printThreadInfo(new PrintWriter(System.out), - // "Thread dump " + Thread.currentThread().getName()); - } - PayloadCarryingRpcController pcrc = - new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); - // Pair<Message, CellScanner> response = - client.call(pcrc, md, builder.build(), param, user, address, - new MetricsConnection.CallStats()); - /* - * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), - * count); - */ - } - LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " - + (System.currentTimeMillis() - startTime) + "ms"); - } finally { - client.close(); - rpcServer.stop(); - } - } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/45af3831/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index 81869b4..dcde844 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -17,42 +17,39 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.net.InetSocketAddress; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RPCTests; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.junit.Assert; -import org.junit.Test; -import org.junit.Before; import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.BlockingService; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - /** - * Test for testing protocol buffer based RPC mechanism. - * This test depends on test.proto definition of types in <code>src/test/protobuf/test.proto</code> - * and protobuf service definition from <code>src/test/protobuf/test_rpc_service.proto</code> + * Test for testing protocol buffer based RPC mechanism. This test depends on test.proto definition + * of types in <code>src/test/protobuf/test.proto</code> and protobuf service definition from + * <code>src/test/protobuf/test_rpc_service.proto</code> */ -@Category({RPCTests.class, MediumTests.class}) +@Category({ RPCTests.class, MediumTests.class }) public class TestProtoBufRpc { public final static String ADDRESS = "localhost"; public static int PORT = 0; @@ -60,47 +57,18 @@ public class TestProtoBufRpc { private Configuration conf; private RpcServerInterface server; - /** - * Implementation of the test service defined out in TestRpcServiceProtos - */ - static class PBServerImpl - implements TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface { - @Override - public EmptyResponseProto ping(RpcController unused, - EmptyRequestProto request) throws ServiceException { - return EmptyResponseProto.newBuilder().build(); - } - - @Override - public EchoResponseProto echo(RpcController unused, EchoRequestProto request) - throws ServiceException { - return EchoResponseProto.newBuilder().setMessage(request.getMessage()) - .build(); - } - - @Override - public EmptyResponseProto error(RpcController unused, - EmptyRequestProto request) throws ServiceException { - throw new ServiceException("error", new IOException("error")); - } - } - @Before - public void setUp() throws IOException { // Setup server for both protocols + public void setUp() throws IOException { // Setup server for both protocols this.conf = HBaseConfiguration.create(); Logger log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer"); log.setLevel(Level.DEBUG); log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer.trace"); log.setLevel(Level.TRACE); // Create server side implementation - PBServerImpl serverImpl = new PBServerImpl(); - BlockingService service = - TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(serverImpl); // Get RPC server for server side implementation this.server = new RpcServer(null, "testrpc", - Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), - new InetSocketAddress(ADDRESS, PORT), conf, - new FifoRpcScheduler(conf, 10)); + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10)); InetSocketAddress address = server.getListenerAddress(); if (address == null) { throw new IOException("Listener channel is closed"); @@ -118,25 +86,20 @@ public class TestProtoBufRpc { public void testProtoBufRpc() throws Exception { RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); try { - BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), System.currentTimeMillis()), - User.getCurrent(), 0); - TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = - TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); + BlockingInterface stub = newBlockingStub(rpcClient, this.isa); // Test ping method - TestProtos.EmptyRequestProto emptyRequest = - TestProtos.EmptyRequestProto.newBuilder().build(); + TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.newBuilder().build(); stub.ping(null, emptyRequest); // Test echo method EchoRequestProto echoRequest = EchoRequestProto.newBuilder().setMessage("hello").build(); EchoResponseProto echoResponse = stub.echo(null, echoRequest); - Assert.assertEquals(echoResponse.getMessage(), "hello"); + assertEquals(echoResponse.getMessage(), "hello"); // Test error method - error should be thrown as RemoteException try { stub.error(null, emptyRequest); - Assert.fail("Expected exception is not thrown"); + fail("Expected exception is not thrown"); } catch (ServiceException e) { } } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/45af3831/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java new file mode 100644 index 0000000..ce7521e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.BlockingService; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Threads; + [email protected] +public class TestProtobufRpcServiceImpl implements BlockingInterface { + + public static final BlockingService SERVICE = TestProtobufRpcProto + .newReflectiveBlockingService(new TestProtobufRpcServiceImpl()); + + public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr) + throws IOException { + return newBlockingStub(client, addr, User.getCurrent()); + } + + public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr, + User user) throws IOException { + return TestProtobufRpcProto.newBlockingStub(client.createBlockingRpcChannel( + ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()), user, 0)); + } + + public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException { + return TestProtobufRpcProto.newStub(client.createProtobufRpcChannel( + ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()), + User.getCurrent(), 0)); + } + + @Override + public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) + throws ServiceException { + return EmptyResponseProto.getDefaultInstance(); + } + + @Override + public EchoResponseProto echo(RpcController controller, EchoRequestProto request) + throws ServiceException { + if (controller instanceof PayloadCarryingRpcController) { + PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; + // If cells, scan them to check we are able to iterate what we were given and since this is an + // echo, just put them back on the controller creating a new block. Tests our block building. + CellScanner cellScanner = pcrc.cellScanner(); + List<Cell> list = null; + if (cellScanner != null) { + list = new ArrayList<>(); + try { + while (cellScanner.advance()) { + list.add(cellScanner.current()); + } + } catch (IOException e) { + throw new ServiceException(e); + } + } + cellScanner = CellUtil.createCellScanner(list); + ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); + } + return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); + } + + @Override + public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) + throws ServiceException { + throw new ServiceException(new DoNotRetryIOException("server error!")); + } + + @Override + public EmptyResponseProto pause(RpcController controller, PauseRequestProto request) + throws ServiceException { + Threads.sleepWithoutInterrupt(request.getMs()); + return EmptyResponseProto.getDefaultInstance(); + } + + @Override + public AddrResponseProto addr(RpcController controller, EmptyRequestProto request) + throws ServiceException { + return AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress()) + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/45af3831/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java index a37ba11..749009f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -17,108 +17,31 @@ */ package org.apache.hadoop.hbase.ipc; -import com.google.common.collect.ImmutableList; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.mockito.Mockito.mock; + import com.google.common.collect.Lists; -import com.google.protobuf.BlockingService; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Pair; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import static org.mockito.Mockito.mock; -@Category({RPCTests.class, SmallTests.class}) +@Category({ RPCTests.class, SmallTests.class }) public class TestRpcHandlerException { - private static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class); - static String example = "xyz"; - static byte[] CELL_BYTES = example.getBytes(); - static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); private final static Configuration CONF = HBaseConfiguration.create(); - RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class); - - // We are using the test TestRpcServiceProtos generated classes and Service because they are - // available and basic with methods like 'echo', and ping. Below we make a blocking service - // by passing in implementation of blocking interface. We use this service in all tests that - // follow. - private static final BlockingService SERVICE = - TestRpcServiceProtos.TestProtobufRpcProto - .newReflectiveBlockingService(new TestRpcServiceProtos - .TestProtobufRpcProto.BlockingInterface() { - - @Override - public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EchoResponseProto echo(RpcController controller, EchoRequestProto request) - throws Error, RuntimeException { - if (controller instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; - // If cells, scan them to check we are able to iterate what we were given and since - // this is - // an echo, just put them back on the controller creating a new block. Tests our - // block - // building. - CellScanner cellScanner = pcrc.cellScanner(); - List<Cell> list = null; - if (cellScanner != null) { - list = new ArrayList<Cell>(); - try { - while (cellScanner.advance()) { - list.add(cellScanner.current()); - throw new StackOverflowError(); - } - } catch (StackOverflowError e) { - throw e; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - cellScanner = CellUtil.createCellScanner(list); - ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); - } - return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); - } - }); /** * Instance of server. We actually don't do anything speical in here so could just use @@ -126,29 +49,18 @@ public class TestRpcHandlerException { */ private static class TestRpcServer extends RpcServer { - TestRpcServer() throws IOException { - this(new FifoRpcScheduler(CONF, 1)); - } - TestRpcServer(RpcScheduler scheduler) throws IOException { super(null, "testRpcServer", - Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("localhost", 0), CONF, scheduler); - } - - @Override - public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) - throws IOException { - return super.call(service, md, param, cellScanner, receiveTime, status); + Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, scheduler); } } - /** Tests that the rpc scheduler is called when requests arrive. - * When Rpc handler thread dies, the client will hang and the test will fail. - * The test is meant to be a unit test to test the behavior. - * - * */ + /** + * Tests that the rpc scheduler is called when requests arrive. When Rpc handler thread dies, the + * client will hang and the test will fail. The test is meant to be a unit test to test the + * behavior. + */ private class AbortServer implements Abortable { private boolean aborted = false; @@ -163,7 +75,8 @@ public class TestRpcHandlerException { } } - /* This is a unit test to make sure to abort region server when the number of Rpc handler thread + /* + * This is a unit test to make sure to abort region server when the number of Rpc handler thread * caught errors exceeds the threshold. Client will hang when RS aborts. */ @Ignore @@ -173,21 +86,12 @@ public class TestRpcHandlerException { Abortable abortable = new AbortServer(); RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0); RpcServer rpcServer = new TestRpcServer(scheduler); - RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT); - try { + try (RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT)) { rpcServer.start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - PayloadCarryingRpcController controller = - new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(), - address, new MetricsConnection.CallStats()); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()); } catch (Throwable e) { - assert(abortable.isAborted() == true); + assert (abortable.isAborted() == true); } finally { rpcServer.stop(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/45af3831/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java index 385b7b0..c1b8de7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.security; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting; import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting; import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration; @@ -25,6 +27,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; +import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; + import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -32,27 +37,21 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.concurrent.ThreadLocalRandom; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; +import javax.security.sasl.SaslException; + import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -64,12 +63,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.mockito.Mockito; -import com.google.common.collect.Lists; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.BlockingService; - -import javax.security.sasl.SaslException; - public abstract class AbstractTestSecureIPC { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -77,55 +70,6 @@ public abstract class AbstractTestSecureIPC { private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri() .getPath()); - static final BlockingService SERVICE = - TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService( - new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { - - @Override - public TestProtos.EmptyResponseProto ping(RpcController controller, - TestProtos.EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public TestProtos.EmptyResponseProto error(RpcController controller, - TestProtos.EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public TestProtos.EchoResponseProto echo(RpcController controller, - TestProtos.EchoRequestProto request) - throws ServiceException { - if (controller instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; - // If cells, scan them to check we are able to iterate what we were given and since - // this is - // an echo, just put them back on the controller creating a new block. Tests our - // block - // building. - CellScanner cellScanner = pcrc.cellScanner(); - List<Cell> list = null; - if (cellScanner != null) { - list = new ArrayList<Cell>(); - try { - while (cellScanner.advance()) { - list.add(cellScanner.current()); - } - } catch (IOException e) { - throw new ServiceException(e); - } - } - cellScanner = CellUtil.createCellScanner(list); - ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); - } - return TestProtos.EchoResponseProto.newBuilder() - .setMessage(request.getMessage()).build(); - } - }); - private static MiniKdc KDC; private static String HOST = "localhost"; private static String PRINCIPAL; @@ -262,16 +206,8 @@ public abstract class AbstractTestSecureIPC { rpcServer.start(); try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) { - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - BlockingRpcChannel channel = - rpcClient.createBlockingRpcChannel( - ServerName.valueOf(address.getHostName(), address.getPort(), - System.currentTimeMillis()), clientUser, 0); - TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = - TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); + BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress(), + clientUser); List<String> results = new ArrayList<>(); TestThread th1 = new TestThread(stub, results); final Throwable exception[] = new Throwable[1]; @@ -298,11 +234,11 @@ public abstract class AbstractTestSecureIPC { } public static class TestThread extends Thread { - private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub; + private final BlockingInterface stub; private final List<String> results; - public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List<String> results) { + public TestThread(BlockingInterface stub, List<String> results) { this.stub = stub; this.results = results; }
