http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index bbf8720..3c3d06d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -17,7 +17,13 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyObject; @@ -25,11 +31,14 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; + import java.io.IOException; -import java.net.ConnectException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; @@ -39,36 +48,22 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; -import org.junit.Assert; import org.junit.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.BlockingService; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - /** * Some basic ipc tests. */ @@ -76,59 +71,11 @@ public abstract class AbstractTestIPC { private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class); - private static byte[] CELL_BYTES = Bytes.toBytes("xyz"); - private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); + private static final byte[] CELL_BYTES = Bytes.toBytes("xyz"); + private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); static byte[] BIG_CELL_BYTES = new byte[10 * 1024]; static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES); static final Configuration CONF = HBaseConfiguration.create(); - // We are using the test TestRpcServiceProtos generated classes and Service because they are - // available and basic with methods like 'echo', and ping. Below we make a blocking service - // by passing in implementation of blocking interface. We use this service in all tests that - // follow. - static final BlockingService SERVICE = - TestRpcServiceProtos.TestProtobufRpcProto - .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { - - @Override - public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EchoResponseProto echo(RpcController controller, EchoRequestProto request) - throws ServiceException { - if (controller instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; - // If cells, scan them to check we are able to iterate what we were given and since - // this is - // an echo, just put them back on the controller creating a new block. Tests our - // block - // building. - CellScanner cellScanner = pcrc.cellScanner(); - List<Cell> list = null; - if (cellScanner != null) { - list = new ArrayList<Cell>(); - try { - while (cellScanner.advance()) { - list.add(cellScanner.current()); - } - } catch (IOException e) { - throw new ServiceException(e); - } - } - cellScanner = CellUtil.createCellScanner(list); - ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); - } - return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); - } - }); /** * Instance of server. We actually don't do anything speical in here so could just use @@ -145,149 +92,106 @@ public abstract class AbstractTestIPC { } TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException { - super(null, "testRpcServer", Lists - .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress( - "localhost", 0), conf, scheduler); - } - - @Override - public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) - throws IOException { - return super.call(service, md, param, cellScanner, receiveTime, status); + super(null, "testRpcServer", + Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), conf, scheduler); } } - protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf); + protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf); /** * Ensure we do not HAVE TO HAVE a codec. - * @throws InterruptedException - * @throws IOException */ @Test - public void testNoCodec() throws InterruptedException, IOException { + public void testNoCodec() throws IOException, ServiceException { Configuration conf = HBaseConfiguration.create(); - AbstractRpcClient client = createRpcClientNoCodec(conf); TestRpcServer rpcServer = new TestRpcServer(); - try { + try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) { rpcServer.start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - final String message = "hello"; - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - Pair<Message, CellScanner> r = - client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address, - new MetricsConnection.CallStats()); - assertTrue(r.getSecond() == null); - // Silly assertion that the message is in the returned pb. - assertTrue(r.getFirst().toString().contains(message)); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + String message = "hello"; + assertEquals(message, + stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); + assertNull(pcrc.cellScanner()); } finally { - client.close(); rpcServer.stop(); } } - protected abstract AbstractRpcClient createRpcClient(Configuration conf); + protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf); /** * It is hard to verify the compression is actually happening under the wraps. Hope that if * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to * confirm that compression is happening down in the client and server). - * @throws IOException - * @throws InterruptedException - * @throws SecurityException - * @throws NoSuchMethodException */ @Test - public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException, - NoSuchMethodException, ServiceException { + public void testCompressCellBlock() throws IOException, ServiceException { Configuration conf = new Configuration(HBaseConfiguration.create()); - conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); - List<Cell> cells = new ArrayList<Cell>(); + // conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); + List<Cell> cells = new ArrayList<>(); int count = 3; for (int i = 0; i < count; i++) { cells.add(CELL); } - AbstractRpcClient client = createRpcClient(conf); TestRpcServer rpcServer = new TestRpcServer(); - try { + try (AbstractRpcClient<?> client = createRpcClient(conf)) { rpcServer.start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - PayloadCarryingRpcController pcrc = - new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - Pair<Message, CellScanner> r = - client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address, - new MetricsConnection.CallStats()); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells)); + String message = "hello"; + assertEquals(message, + stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); int index = 0; - while (r.getSecond().advance()) { - assertTrue(CELL.equals(r.getSecond().current())); + CellScanner cellScanner = pcrc.cellScanner(); + assertNotNull(cellScanner); + while (cellScanner.advance()) { + assertEquals(CELL, cellScanner.current()); index++; } assertEquals(count, index); } finally { - client.close(); rpcServer.stop(); } } - protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) - throws IOException; + protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup( + Configuration conf) throws IOException; @Test public void testRTEDuringConnectionSetup() throws Exception { Configuration conf = HBaseConfiguration.create(); TestRpcServer rpcServer = new TestRpcServer(); - AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf); - try { + try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) { rpcServer.start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - client.call(null, md, param, null, User.getCurrent(), address, - new MetricsConnection.CallStats()); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + stub.ping(null, EmptyRequestProto.getDefaultInstance()); fail("Expected an exception to have been thrown!"); } catch (Exception e) { LOG.info("Caught expected exception: " + e.toString()); - assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); + assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault")); } finally { - client.close(); rpcServer.stop(); } } - /** Tests that the rpc scheduler is called when requests arrive. */ + /** + * Tests that the rpc scheduler is called when requests arrive. + */ @Test - public void testRpcScheduler() throws IOException, InterruptedException { + public void testRpcScheduler() throws IOException, ServiceException, InterruptedException { RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); RpcServer rpcServer = new TestRpcServer(scheduler, CONF); verify(scheduler).init((RpcScheduler.Context) anyObject()); - AbstractRpcClient client = createRpcClient(CONF); - try { + try (AbstractRpcClient<?> client = createRpcClient(CONF)) { rpcServer.start(); verify(scheduler).start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } for (int i = 0; i < 10; i++) { - client.call(new PayloadCarryingRpcController( - CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param, - md.getOutputType().toProto(), User.getCurrent(), address, - new MetricsConnection.CallStats()); + stub.echo(null, param); } verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); } finally { @@ -298,109 +202,194 @@ public abstract class AbstractTestIPC { /** Tests that the rpc scheduler is called when requests arrive. */ @Test - public void testRpcMaxRequestSize() throws IOException, InterruptedException { + public void testRpcMaxRequestSize() throws IOException, ServiceException { Configuration conf = new Configuration(CONF); conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000); RpcServer rpcServer = new TestRpcServer(conf); - AbstractRpcClient client = createRpcClient(conf); - try { + try (AbstractRpcClient<?> client = createRpcClient(conf)) { rpcServer.start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); StringBuilder message = new StringBuilder(1200); for (int i = 0; i < 200; i++) { message.append("hello."); } - // set total RPC size bigger than 1000 bytes + // set total RPC size bigger than 100 bytes EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - try { - client.call(new PayloadCarryingRpcController( - CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param, - md.getOutputType().toProto(), User.getCurrent(), address, - new MetricsConnection.CallStats()); - fail("RPC should have failed because it exceeds max request size"); - } catch(IOException e) { - LOG.info("Caught expected exception: " + e); - assertTrue(e.toString(), - StringUtils.stringifyException(e).contains("RequestTooBigException")); - } + stub.echo( + new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), + param); + fail("RPC should have failed because it exceeds max request size"); + } catch (ServiceException e) { + LOG.info("Caught expected exception: " + e); + assertTrue(e.toString(), + StringUtils.stringifyException(e).contains("RequestTooBigException")); } finally { rpcServer.stop(); } } /** - * Instance of RpcServer that echoes client hostAddress back to client + * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null + * remoteAddress set to its Call Object + * @throws ServiceException */ - static class TestRpcServer1 extends RpcServer { - - private static BlockingInterface SERVICE1 = - new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { - @Override - public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request) - throws ServiceException { - return EmptyResponseProto.newBuilder().build(); - } - - @Override - public EchoResponseProto echo(RpcController unused, EchoRequestProto request) - throws ServiceException { - final InetAddress remoteAddr = TestRpcServer1.getRemoteAddress(); - final String message = remoteAddr == null ? "NULL" : remoteAddr.getHostAddress(); - return EchoResponseProto.newBuilder().setMessage(message).build(); - } - - @Override - public EmptyResponseProto error(RpcController unused, EmptyRequestProto request) - throws ServiceException { - throw new ServiceException("error", new IOException("error")); - } - }; - - TestRpcServer1() throws IOException { - this(new FifoRpcScheduler(CONF, 1)); + @Test + public void testRpcServerForNotNullRemoteAddressInCallObject() + throws IOException, ServiceException { + TestRpcServer rpcServer = new TestRpcServer(); + InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); + try (AbstractRpcClient<?> client = createRpcClient(CONF)) { + rpcServer.start(); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + assertEquals(localAddr.getAddress().getHostAddress(), + stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr()); + } finally { + rpcServer.stop(); } + } - TestRpcServer1(RpcScheduler scheduler) throws IOException { - super(null, "testRemoteAddressInCallObject", Lists - .newArrayList(new BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto - .newReflectiveBlockingService(SERVICE1), null)), - new InetSocketAddress("localhost", 0), CONF, scheduler); + @Test + public void testRemoteError() throws IOException, ServiceException { + TestRpcServer rpcServer = new TestRpcServer(); + try (AbstractRpcClient<?> client = createRpcClient(CONF)) { + rpcServer.start(); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + stub.error(null, EmptyRequestProto.getDefaultInstance()); + } catch (ServiceException e) { + LOG.info("Caught expected exception: " + e); + IOException ioe = ProtobufUtil.handleRemoteException(e); + assertTrue(ioe instanceof DoNotRetryIOException); + assertTrue(ioe.getMessage().contains("server error!")); + } finally { + rpcServer.stop(); } } - /** - * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null - * remoteAddress set to its Call Object - * @throws ServiceException - */ @Test - public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException, - ServiceException { - final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1); - final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler); - final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); - final AbstractRpcClient client = - new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, null); - try { + public void testTimeout() throws IOException { + TestRpcServer rpcServer = new TestRpcServer(); + try (AbstractRpcClient<?> client = createRpcClient(CONF)) { rpcServer.start(); - final InetSocketAddress isa = rpcServer.getListenerAddress(); - if (isa == null) { - throw new IOException("Listener channel is closed"); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + int ms = 1000; + int timeout = 100; + for (int i = 0; i < 10; i++) { + pcrc.reset(); + pcrc.setCallTimeout(timeout); + long startTime = System.nanoTime(); + try { + stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build()); + } catch (ServiceException e) { + long waitTime = (System.nanoTime() - startTime) / 1000000; + // expected + LOG.info("Caught expected exception: " + e); + IOException ioe = ProtobufUtil.handleRemoteException(e); + assertTrue(ioe.getCause() instanceof CallTimeoutException); + // confirm that we got exception before the actual pause. + assertTrue(waitTime < ms); + } } - final BlockingRpcChannel channel = - client.createBlockingRpcChannel( - ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()), - User.getCurrent(), 0); - TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = - TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); - final EchoRequestProto echoRequest = - EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build(); - final EchoResponseProto echoResponse = stub.echo(null, echoRequest); - Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage()); + } finally { + rpcServer.stop(); + } + } + + static class TestFailingRpcServer extends TestRpcServer { + + TestFailingRpcServer() throws IOException { + this(new FifoRpcScheduler(CONF, 1), CONF); + } + + TestFailingRpcServer(Configuration conf) throws IOException { + this(new FifoRpcScheduler(conf, 1), conf); + } + + TestFailingRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException { + super(scheduler, conf); + } + + class FailingConnection extends Connection { + public FailingConnection(SocketChannel channel, long lastContact) { + super(channel, lastContact); + } + + @Override + protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + // this will throw exception after the connection header is read, and an RPC is sent + // from client + throw new DoNotRetryIOException("Failing for test"); + } + } + + @Override + protected Connection getConnection(SocketChannel channel, long time) { + return new FailingConnection(channel, time); + } + } + + /** Tests that the connection closing is handled by the client with outstanding RPC calls */ + @Test + public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException { + Configuration conf = new Configuration(CONF); + RpcServer rpcServer = new TestFailingRpcServer(conf); + try (AbstractRpcClient<?> client = createRpcClient(conf)) { + rpcServer.start(); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + stub.echo(null, param); + fail("RPC should have failed because connection closed"); + } catch (ServiceException e) { + LOG.info("Caught expected exception: " + e.toString()); + } finally { + rpcServer.stop(); + } + } + + @Test + public void testAsyncEcho() throws IOException { + Configuration conf = HBaseConfiguration.create(); + TestRpcServer rpcServer = new TestRpcServer(); + try (AbstractRpcClient<?> client = createRpcClient(conf)) { + rpcServer.start(); + Interface stub = newStub(client, rpcServer.getListenerAddress()); + int num = 10; + List<HBaseRpcController> pcrcList = new ArrayList<>(); + List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>(); + for (int i = 0; i < num; i++) { + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>(); + stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done); + pcrcList.add(pcrc); + callbackList.add(done); + } + for (int i = 0; i < num; i++) { + HBaseRpcController pcrc = pcrcList.get(i); + assertFalse(pcrc.failed()); + assertNull(pcrc.cellScanner()); + assertEquals("hello-" + i, callbackList.get(i).get().getMessage()); + } + } finally { + rpcServer.stop(); + } + } + + @Test + public void testAsyncRemoteError() throws IOException { + AbstractRpcClient<?> client = createRpcClient(CONF); + TestRpcServer rpcServer = new TestRpcServer(); + try { + rpcServer.start(); + Interface stub = newStub(client, rpcServer.getListenerAddress()); + BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback); + assertNull(callback.get()); + assertTrue(pcrc.failed()); + LOG.info("Caught expected exception: " + pcrc.getFailed()); + IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); + assertTrue(ioe instanceof DoNotRetryIOException); + assertTrue(ioe.getMessage().contains("server error!")); } finally { client.close(); rpcServer.stop(); @@ -408,17 +397,38 @@ public abstract class AbstractTestIPC { } @Test - public void testWrapException() throws Exception { - AbstractRpcClient client = - (AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC"); - final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0); - assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException); - assertTrue(client.wrapException(address, - new SocketTimeoutException()) instanceof SocketTimeoutException); - assertTrue(client.wrapException(address, new ConnectionClosingException( - "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException); - assertTrue(client - .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException")) - .getCause() instanceof CallTimeoutException); + public void testAsyncTimeout() throws IOException { + TestRpcServer rpcServer = new TestRpcServer(); + try (AbstractRpcClient<?> client = createRpcClient(CONF)) { + rpcServer.start(); + Interface stub = newStub(client, rpcServer.getListenerAddress()); + List<HBaseRpcController> pcrcList = new ArrayList<>(); + List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>(); + int ms = 1000; + int timeout = 100; + long startTime = System.nanoTime(); + for (int i = 0; i < 10; i++) { + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + pcrc.setCallTimeout(timeout); + BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); + stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback); + pcrcList.add(pcrc); + callbackList.add(callback); + } + for (BlockingRpcCallback<?> callback : callbackList) { + assertNull(callback.get()); + } + long waitTime = (System.nanoTime() - startTime) / 1000000; + for (HBaseRpcController pcrc : pcrcList) { + assertTrue(pcrc.failed()); + LOG.info("Caught expected exception: " + pcrc.getFailed()); + IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); + assertTrue(ioe.getCause() instanceof CallTimeoutException); + } + // confirm that we got exception before the actual pause. + assertTrue(waitTime < ms); + } finally { + rpcServer.stop(); + } } }
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java deleted file mode 100644 index d9b3e49..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java +++ /dev/null @@ -1,306 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPromise; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.util.StringUtils; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcChannel; - -@RunWith(Parameterized.class) -@Category({ SmallTests.class }) -public class TestAsyncIPC extends AbstractTestIPC { - - private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class); - - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - @Parameters - public static Collection<Object[]> parameters() { - List<Object[]> paramList = new ArrayList<Object[]>(); - paramList.add(new Object[] { false, false }); - paramList.add(new Object[] { false, true }); - paramList.add(new Object[] { true, false }); - paramList.add(new Object[] { true, true }); - return paramList; - } - - private final boolean useNativeTransport; - - private final boolean useGlobalEventLoopGroup; - - public TestAsyncIPC(boolean useNativeTransport, boolean useGlobalEventLoopGroup) { - this.useNativeTransport = useNativeTransport; - this.useGlobalEventLoopGroup = useGlobalEventLoopGroup; - } - - private void setConf(Configuration conf) { - conf.setBoolean(AsyncRpcClient.USE_NATIVE_TRANSPORT, useNativeTransport); - conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, useGlobalEventLoopGroup); - if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) { - if (useNativeTransport - && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup) - || (!useNativeTransport - && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof NioEventLoopGroup))) { - AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully(); - AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null; - } - } - } - - @Override - protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) { - setConf(conf); - return new AsyncRpcClient(conf) { - - @Override - Codec getCodec() { - return null; - } - - }; - } - - @Override - protected AsyncRpcClient createRpcClient(Configuration conf) { - setConf(conf); - return new AsyncRpcClient(conf); - } - - @Override - protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) { - setConf(conf); - return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - promise.setFailure(new RuntimeException("Injected fault")); - } - }); - } - }); - } - - @Test - public void testAsyncConnectionSetup() throws Exception { - TestRpcServer rpcServer = new TestRpcServer(); - AsyncRpcClient client = createRpcClient(CONF); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - - RpcChannel channel = - client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(), - System.currentTimeMillis()), User.getCurrent(), 0); - - final AtomicBoolean done = new AtomicBoolean(false); - - channel.callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType() - .toProto(), new RpcCallback<Message>() { - @Override - public void run(Message parameter) { - done.set(true); - } - }); - - TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() { - @Override - public boolean evaluate() throws Exception { - return done.get(); - } - }); - } finally { - client.close(); - rpcServer.stop(); - } - } - - @Test - public void testRTEDuringAsyncConnectionSetup() throws Exception { - TestRpcServer rpcServer = new TestRpcServer(); - AsyncRpcClient client = createRpcClientRTEDuringConnectionSetup(CONF); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - - RpcChannel channel = - client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(), - System.currentTimeMillis()), User.getCurrent(), 0); - - final AtomicBoolean done = new AtomicBoolean(false); - - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); - controller.notifyOnFail(new RpcCallback<IOException>() { - @Override - public void run(IOException e) { - done.set(true); - LOG.info("Caught expected exception: " + e.toString()); - assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); - } - }); - - channel.callMethod(md, controller, param, md.getOutputType().toProto(), - new RpcCallback<Message>() { - @Override - public void run(Message parameter) { - done.set(true); - fail("Expected an exception to have been thrown!"); - } - }); - - TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() { - @Override - public boolean evaluate() throws Exception { - return done.get(); - } - }); - } finally { - client.close(); - rpcServer.stop(); - } - } - - public static void main(String[] args) throws IOException, SecurityException, - NoSuchMethodException, InterruptedException { - if (args.length != 2) { - System.out.println("Usage: TestAsyncIPC <CYCLES> <CELLS_PER_CYCLE>"); - return; - } - // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO); - // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO); - int cycles = Integer.parseInt(args[0]); - int cellcount = Integer.parseInt(args[1]); - Configuration conf = HBaseConfiguration.create(); - TestRpcServer rpcServer = new TestRpcServer(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - AsyncRpcClient client = new AsyncRpcClient(conf); - KeyValue kv = BIG_CELL; - Put p = new Put(CellUtil.cloneRow(kv)); - for (int i = 0; i < cellcount; i++) { - p.add(kv); - } - RowMutations rm = new RowMutations(CellUtil.cloneRow(kv)); - rm.add(p); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - long startTime = System.currentTimeMillis(); - User user = User.getCurrent(); - for (int i = 0; i < cycles; i++) { - List<CellScannable> cells = new ArrayList<CellScannable>(); - // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); - ClientProtos.RegionAction.Builder builder = - RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells, - RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), - MutationProto.newBuilder()); - builder.setRegion(RegionSpecifier - .newBuilder() - .setType(RegionSpecifierType.REGION_NAME) - .setValue( - ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); - if (i % 100000 == 0) { - LOG.info("" + i); - // Uncomment this for a thread dump every so often. - // ReflectionUtils.printThreadInfo(new PrintWriter(System.out), - // "Thread dump " + Thread.currentThread().getName()); - } - PayloadCarryingRpcController pcrc = - new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); - // Pair<Message, CellScanner> response = - client.call(pcrc, md, builder.build(), param, user, address, - new MetricsConnection.CallStats()); - /* - * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), - * count); - */ - } - LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " - + (System.currentTimeMillis() - startTime) + "ms"); - } finally { - client.close(); - rpcServer.stop(); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java new file mode 100644 index 0000000..98efcfb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java @@ -0,0 +1,58 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.experimental.categories.Category; + +@Category({ RPCTests.class, SmallTests.class }) +public class TestBlockingIPC extends AbstractTestIPC { + + @Override + protected BlockingRpcClient createRpcClientNoCodec(Configuration conf) { + return new BlockingRpcClient(conf) { + @Override + Codec getCodec() { + return null; + } + }; + } + + @Override + protected BlockingRpcClient createRpcClient(Configuration conf) { + return new BlockingRpcClient(conf); + } + + @Override + protected BlockingRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) + throws IOException { + return new BlockingRpcClient(conf) { + + @Override + boolean isTcpNoDelay() { + throw new RuntimeException("Injected fault"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java deleted file mode 100644 index e294830..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertSame; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ SmallTests.class }) -public class TestGlobalEventLoopGroup { - - @Test - public void test() { - Configuration conf = HBaseConfiguration.create(); - conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, true); - AsyncRpcClient client = new AsyncRpcClient(conf); - assertNotNull(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP); - AsyncRpcClient client1 = new AsyncRpcClient(conf); - assertSame(client.bootstrap.group(), client1.bootstrap.group()); - client1.close(); - assertFalse(client.bootstrap.group().isShuttingDown()); - - conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, false); - AsyncRpcClient client2 = new AsyncRpcClient(conf); - assertNotSame(client.bootstrap.group(), client2.bootstrap.group()); - client2.close(); - - client.close(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java deleted file mode 100644 index d3dbd33..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.ArrayList; -import java.util.List; - -import javax.net.SocketFactory; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.net.NetUtils; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors.MethodDescriptor; - -@Category({ SmallTests.class }) -public class TestIPC extends AbstractTestIPC { - - private static final Log LOG = LogFactory.getLog(TestIPC.class); - - @Override - protected RpcClientImpl createRpcClientNoCodec(Configuration conf) { - return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) { - @Override - Codec getCodec() { - return null; - } - }; - } - - @Override - protected RpcClientImpl createRpcClient(Configuration conf) { - return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT); - } - - @Override - protected RpcClientImpl createRpcClientRTEDuringConnectionSetup(Configuration conf) - throws IOException { - SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf)); - Mockito.doAnswer(new Answer<Socket>() { - @Override - public Socket answer(InvocationOnMock invocation) throws Throwable { - Socket s = spy((Socket) invocation.callRealMethod()); - doThrow(new RuntimeException("Injected fault")).when(s).setSoTimeout(anyInt()); - return s; - } - }).when(spyFactory).createSocket(); - - return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory); - } - - public static void main(String[] args) throws IOException, SecurityException, - NoSuchMethodException, InterruptedException { - if (args.length != 2) { - System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>"); - return; - } - // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO); - // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO); - int cycles = Integer.parseInt(args[0]); - int cellcount = Integer.parseInt(args[1]); - Configuration conf = HBaseConfiguration.create(); - TestRpcServer rpcServer = new TestRpcServer(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT); - KeyValue kv = BIG_CELL; - Put p = new Put(CellUtil.cloneRow(kv)); - for (int i = 0; i < cellcount; i++) { - p.add(kv); - } - RowMutations rm = new RowMutations(CellUtil.cloneRow(kv)); - rm.add(p); - try { - rpcServer.start(); - long startTime = System.currentTimeMillis(); - User user = User.getCurrent(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - for (int i = 0; i < cycles; i++) { - List<CellScannable> cells = new ArrayList<CellScannable>(); - // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); - ClientProtos.RegionAction.Builder builder = - RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells, - RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), - MutationProto.newBuilder()); - builder.setRegion(RegionSpecifier - .newBuilder() - .setType(RegionSpecifierType.REGION_NAME) - .setValue( - ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); - if (i % 100000 == 0) { - LOG.info("" + i); - // Uncomment this for a thread dump every so often. - // ReflectionUtils.printThreadInfo(new PrintWriter(System.out), - // "Thread dump " + Thread.currentThread().getName()); - } - PayloadCarryingRpcController pcrc = - new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); - // Pair<Message, CellScanner> response = - client.call(pcrc, md, builder.build(), param, user, address, - new MetricsConnection.CallStats()); - /* - * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), - * count); - */ - } - LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " - + (System.currentTimeMillis() - startTime) + "ms"); - } finally { - client.close(); - rpcServer.stop(); - } - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java new file mode 100644 index 0000000..3b32383 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.JVM; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ RPCTests.class, SmallTests.class }) +public class TestNettyIPC extends AbstractTestIPC { + + @Parameters(name = "{index}: EventLoop={0}") + public static Collection<Object[]> parameters() { + List<Object[]> params = new ArrayList<>(); + params.add(new Object[] { "nio" }); + params.add(new Object[] { "perClientNio" }); + if (JVM.isLinux() && JVM.isAmd64()) { + params.add(new Object[] { "epoll" }); + } + return params; + } + + @Parameter + public String eventLoopType; + + private static NioEventLoopGroup NIO; + + private static EpollEventLoopGroup EPOLL; + + @BeforeClass + public static void setUpBeforeClass() { + NIO = new NioEventLoopGroup(); + if (JVM.isLinux() && JVM.isAmd64()) { + EPOLL = new EpollEventLoopGroup(); + } + } + + @AfterClass + public static void tearDownAfterClass() { + if (NIO != null) { + NIO.shutdownGracefully(); + } + if (EPOLL != null) { + EPOLL.shutdownGracefully(); + } + } + + private void setConf(Configuration conf) { + switch (eventLoopType) { + case "nio": + NettyRpcClientConfigHelper.setEventLoopConfig(conf, NIO, NioSocketChannel.class); + break; + case "epoll": + NettyRpcClientConfigHelper.setEventLoopConfig(conf, EPOLL, EpollSocketChannel.class); + break; + case "perClientNio": + NettyRpcClientConfigHelper.createEventLoopPerClient(conf); + break; + default: + break; + } + } + + @Override + protected NettyRpcClient createRpcClientNoCodec(Configuration conf) { + setConf(conf); + return new NettyRpcClient(conf) { + + @Override + Codec getCodec() { + return null; + } + + }; + } + + @Override + protected NettyRpcClient createRpcClient(Configuration conf) { + setConf(conf); + return new NettyRpcClient(conf); + } + + @Override + protected NettyRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) { + setConf(conf); + return new NettyRpcClient(conf) { + + @Override + boolean isTcpNoDelay() { + throw new RuntimeException("Injected fault"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index ffb3927..3df4cdc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -17,41 +17,39 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.net.InetSocketAddress; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.junit.Assert; -import org.junit.Test; -import org.junit.Before; import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.BlockingService; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - /** - * Test for testing protocol buffer based RPC mechanism. - * This test depends on test.proto definition of types in <code>src/test/protobuf/test.proto</code> - * and protobuf service definition from <code>src/test/protobuf/test_rpc_service.proto</code> + * Test for testing protocol buffer based RPC mechanism. This test depends on test.proto definition + * of types in <code>src/test/protobuf/test.proto</code> and protobuf service definition from + * <code>src/test/protobuf/test_rpc_service.proto</code> */ -@Category(MediumTests.class) +@Category({ RPCTests.class, MediumTests.class }) public class TestProtoBufRpc { public final static String ADDRESS = "localhost"; public static int PORT = 0; @@ -59,47 +57,18 @@ public class TestProtoBufRpc { private Configuration conf; private RpcServerInterface server; - /** - * Implementation of the test service defined out in TestRpcServiceProtos - */ - static class PBServerImpl - implements TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface { - @Override - public EmptyResponseProto ping(RpcController unused, - EmptyRequestProto request) throws ServiceException { - return EmptyResponseProto.newBuilder().build(); - } - - @Override - public EchoResponseProto echo(RpcController unused, EchoRequestProto request) - throws ServiceException { - return EchoResponseProto.newBuilder().setMessage(request.getMessage()) - .build(); - } - - @Override - public EmptyResponseProto error(RpcController unused, - EmptyRequestProto request) throws ServiceException { - throw new ServiceException("error", new IOException("error")); - } - } - @Before - public void setUp() throws IOException { // Setup server for both protocols + public void setUp() throws IOException { // Setup server for both protocols this.conf = HBaseConfiguration.create(); Logger log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer"); log.setLevel(Level.DEBUG); log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer.trace"); log.setLevel(Level.TRACE); // Create server side implementation - PBServerImpl serverImpl = new PBServerImpl(); - BlockingService service = - TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(serverImpl); // Get RPC server for server side implementation this.server = new RpcServer(null, "testrpc", - Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), - new InetSocketAddress(ADDRESS, PORT), conf, - new FifoRpcScheduler(conf, 10)); + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10)); InetSocketAddress address = server.getListenerAddress(); if (address == null) { throw new IOException("Listener channel is closed"); @@ -113,31 +82,23 @@ public class TestProtoBufRpc { server.stop(); } - @Test + @Test(expected = ServiceException.class + /* Thrown when we call stub.error */) public void testProtoBufRpc() throws Exception { RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); try { - BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), System.currentTimeMillis()), - User.getCurrent(), 0); - TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = - TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); + BlockingInterface stub = newBlockingStub(rpcClient, this.isa); // Test ping method - TestProtos.EmptyRequestProto emptyRequest = - TestProtos.EmptyRequestProto.newBuilder().build(); + TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.newBuilder().build(); stub.ping(null, emptyRequest); // Test echo method EchoRequestProto echoRequest = EchoRequestProto.newBuilder().setMessage("hello").build(); EchoResponseProto echoResponse = stub.echo(null, echoRequest); - Assert.assertEquals(echoResponse.getMessage(), "hello"); + assertEquals(echoResponse.getMessage(), "hello"); - // Test error method - error should be thrown as RemoteException - try { - stub.error(null, emptyRequest); - Assert.fail("Expected exception is not thrown"); - } catch (ServiceException e) { - } + stub.error(null, emptyRequest); + fail("Expected exception is not thrown"); } finally { rpcClient.close(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java new file mode 100644 index 0000000..8f947b1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + + +import com.google.protobuf.BlockingService; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Threads; + [email protected] +public class TestProtobufRpcServiceImpl implements BlockingInterface { + + public static final BlockingService SERVICE = TestProtobufRpcProto + .newReflectiveBlockingService(new TestProtobufRpcServiceImpl()); + + public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr) + throws IOException { + return newBlockingStub(client, addr, User.getCurrent()); + } + + public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr, + User user) throws IOException { + return TestProtobufRpcProto.newBlockingStub(client.createBlockingRpcChannel( + ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()), user, 0)); + } + + public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException { + return TestProtobufRpcProto.newStub(client.createRpcChannel( + ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()), + User.getCurrent(), 0)); + } + + @Override + public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) + throws ServiceException { + return EmptyResponseProto.getDefaultInstance(); + } + + @Override + public EchoResponseProto echo(RpcController controller, EchoRequestProto request) + throws ServiceException { + if (controller instanceof HBaseRpcController) { + HBaseRpcController pcrc = (HBaseRpcController) controller; + // If cells, scan them to check we are able to iterate what we were given and since this is an + // echo, just put them back on the controller creating a new block. Tests our block building. + CellScanner cellScanner = pcrc.cellScanner(); + List<Cell> list = null; + if (cellScanner != null) { + list = new ArrayList<>(); + try { + while (cellScanner.advance()) { + list.add(cellScanner.current()); + } + } catch (IOException e) { + throw new ServiceException(e); + } + } + cellScanner = CellUtil.createCellScanner(list); + pcrc.setCellScanner(cellScanner); + } + return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); + } + + @Override + public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) + throws ServiceException { + throw new ServiceException(new DoNotRetryIOException("server error!")); + } + + @Override + public EmptyResponseProto pause(RpcController controller, PauseRequestProto request) + throws ServiceException { + Threads.sleepWithoutInterrupt(request.getMs()); + return EmptyResponseProto.getDefaultInstance(); + } + + @Override + public AddrResponseProto addr(RpcController controller, EmptyRequestProto request) + throws ServiceException { + return AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress()) + .build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java index 596b8ab..e4ecd10 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java @@ -17,15 +17,20 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Lists; + import java.io.IOException; import java.net.Socket; import java.net.SocketAddress; import java.util.List; -import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -35,28 +40,26 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; +import org.junit.rules.TestRule; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; -import static org.junit.Assert.*; - -@Category(SmallTests.class) +@Category(MediumTests.class) public class TestRpcClientLeaks { + @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()). + withLookingForStuckThread(true).build(); - public static class MyRpcClientImpl extends RpcClientImpl { + public static class MyRpcClientImpl extends BlockingRpcClient { public static List<Socket> savedSockets = Lists.newArrayList(); @Rule public ExpectedException thrown = ExpectedException.none(); - public MyRpcClientImpl(Configuration conf, String clusterId) { - super(conf, clusterId); + public MyRpcClientImpl(Configuration conf) { + super(conf); } public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address, @@ -65,9 +68,8 @@ public class TestRpcClientLeaks { } @Override - protected Connection createConnection(ConnectionId remoteId, Codec codec, - CompressionCodec compressor) throws IOException { - return new Connection(remoteId, codec, compressor) { + protected BlockingRpcConnection createConnection(ConnectionId remoteId) throws IOException { + return new BlockingRpcConnection(this, remoteId) { @Override protected synchronized void setupConnection() throws IOException { super.setupConnection(); @@ -113,5 +115,4 @@ public class TestRpcClientLeaks { assertTrue("Socket + " + socket + " is not closed", socket.isClosed()); } } -} - +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java index 03e9e4e..d197bf2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -17,107 +17,31 @@ */ package org.apache.hadoop.hbase.ipc; -import com.google.common.collect.ImmutableList; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.mockito.Mockito.mock; + import com.google.common.collect.Lists; import com.google.protobuf.BlockingService; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; +import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Pair; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import static org.mockito.Mockito.mock; -@Category({SmallTests.class}) +@Category({ RPCTests.class, SmallTests.class }) public class TestRpcHandlerException { - private static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class); - static String example = "xyz"; - static byte[] CELL_BYTES = example.getBytes(); - static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); private final static Configuration CONF = HBaseConfiguration.create(); - RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class); - - // We are using the test TestRpcServiceProtos generated classes and Service because they are - // available and basic with methods like 'echo', and ping. Below we make a blocking service - // by passing in implementation of blocking interface. We use this service in all tests that - // follow. - private static final BlockingService SERVICE = - TestRpcServiceProtos.TestProtobufRpcProto - .newReflectiveBlockingService(new TestRpcServiceProtos - .TestProtobufRpcProto.BlockingInterface() { - - @Override - public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EchoResponseProto echo(RpcController controller, EchoRequestProto request) - throws Error, RuntimeException { - if (controller instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; - // If cells, scan them to check we are able to iterate what we were given and since - // this is - // an echo, just put them back on the controller creating a new block. Tests our - // block - // building. - CellScanner cellScanner = pcrc.cellScanner(); - List<Cell> list = null; - if (cellScanner != null) { - list = new ArrayList<Cell>(); - try { - while (cellScanner.advance()) { - list.add(cellScanner.current()); - throw new StackOverflowError(); - } - } catch (StackOverflowError e) { - throw e; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - cellScanner = CellUtil.createCellScanner(list); - ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); - } - return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); - } - }); /** * Instance of server. We actually don't do anything speical in here so could just use @@ -125,29 +49,18 @@ public class TestRpcHandlerException { */ private static class TestRpcServer extends RpcServer { - TestRpcServer() throws IOException { - this(new FifoRpcScheduler(CONF, 1)); - } - TestRpcServer(RpcScheduler scheduler) throws IOException { super(null, "testRpcServer", - Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("localhost", 0), CONF, scheduler); - } - - @Override - public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) - throws IOException { - return super.call(service, md, param, cellScanner, receiveTime, status); + Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, scheduler); } } - /** Tests that the rpc scheduler is called when requests arrive. - * When Rpc handler thread dies, the client will hang and the test will fail. - * The test is meant to be a unit test to test the behavior. - * - * */ + /** + * Tests that the rpc scheduler is called when requests arrive. When Rpc handler thread dies, the + * client will hang and the test will fail. The test is meant to be a unit test to test the + * behavior. + */ private class AbortServer implements Abortable { private boolean aborted = false; @@ -162,7 +75,8 @@ public class TestRpcHandlerException { } } - /* This is a unit test to make sure to abort region server when the number of Rpc handler thread + /* + * This is a unit test to make sure to abort region server when the number of Rpc handler thread * caught errors exceeds the threshold. Client will hang when RS aborts. */ @Ignore @@ -172,21 +86,12 @@ public class TestRpcHandlerException { Abortable abortable = new AbortServer(); RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0); RpcServer rpcServer = new TestRpcServer(scheduler); - RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT); - try { + try (BlockingRpcClient client = new BlockingRpcClient(CONF)) { rpcServer.start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - PayloadCarryingRpcController controller = - new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(), - address, new MetricsConnection.CallStats()); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()); } catch (Throwable e) { - assert(abortable.isAborted() == true); + assert (abortable.isAborted() == true); } finally { rpcServer.stop(); }
