Repository: hbase Updated Branches: refs/heads/master e05341d01 -> 854f13afa
http://git-wip-us.apache.org/repos/asf/hbase/blob/854f13af/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JVM.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JVM.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JVM.java index 2df3024..9fb7037 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JVM.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JVM.java @@ -70,6 +70,15 @@ public class JVM { } return (ibmvendor ? linux : true); } + + /** + * Check if the OS is linux. + * + * @return whether this is linux or not. + */ + public static boolean isLinux() { + return linux; + } /** * Check if the finish() method of GZIPOutputStream is broken http://git-wip-us.apache.org/repos/asf/hbase/blob/854f13af/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index c787414..5eb9a52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -44,6 +44,7 @@ import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -1412,9 +1413,9 @@ public class RpcServer implements RpcServerInterface { int count; // Check for 'HBas' magic. this.dataLengthBuffer.flip(); - if (!HConstants.RPC_HEADER.equals(dataLengthBuffer)) { + if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) { return doBadPreambleHandling("Expected HEADER=" + - Bytes.toStringBinary(HConstants.RPC_HEADER.array()) + + Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) + " from " + toString()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/854f13af/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 081b5dd..0933f52 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -33,9 +33,18 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.SocketFactory; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.SocketChannel; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -44,10 +53,13 @@ import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.client.Put; @@ -91,7 +103,10 @@ import com.google.protobuf.ServiceException; */ @Category({RPCTests.class, SmallTests.class}) public class TestIPC { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + public static final Log LOG = LogFactory.getLog(TestIPC.class); + static byte [] CELL_BYTES = Bytes.toBytes("xyz"); static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); static byte [] BIG_CELL_BYTES = new byte [10 * 1024]; @@ -191,8 +206,8 @@ public class TestIPC { MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); final String message = "hello"; EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); - Pair<Message, CellScanner> r = client.call(null, md, param, null, - md.getOutputType().toProto(), User.getCurrent(), address, 0); + Pair<Message, CellScanner> r = client.call(null, md, param, + md.getOutputType().toProto(), User.getCurrent(), address); assertTrue(r.getSecond() == null); // Silly assertion that the message is in the returned pb. assertTrue(r.getFirst().toString().contains(message)); @@ -203,6 +218,44 @@ public class TestIPC { } /** + * Ensure we do not HAVE TO HAVE a codec. + * + * @throws InterruptedException + * @throws IOException + */ + @Test public void testNoCodecAsync() throws InterruptedException, IOException, ServiceException { + Configuration conf = HBaseConfiguration.create(); + AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) { + @Override Codec getCodec() { + return null; + } + }; + TestRpcServer rpcServer = new TestRpcServer(); + try { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + final String message = "hello"; + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); + + BlockingRpcChannel channel = client + .createBlockingRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(), + System.currentTimeMillis()), User.getCurrent(), 0); + + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + Message response = + channel.callBlockingMethod(md, controller, param, md.getOutputType().toProto()); + + assertTrue(controller.cellScanner() == null); + // Silly assertion that the message is in the returned pb. + assertTrue(response.toString().contains(message)); + } finally { + client.close(); + rpcServer.stop(); + } + } + + /** * It is hard to verify the compression is actually happening under the wraps. Hope that if * unsupported, we'll get an exception out of some time (meantime, have to trace it manually * to confirm that compression is happening down in the client and server). @@ -213,13 +266,17 @@ public class TestIPC { */ @Test public void testCompressCellBlock() - throws IOException, InterruptedException, SecurityException, NoSuchMethodException { + throws IOException, InterruptedException, SecurityException, NoSuchMethodException, + ServiceException { Configuration conf = new Configuration(HBaseConfiguration.create()); conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); - doSimpleTest(conf, new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT)); + doSimpleTest(new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT)); + + // Another test for the async client + doAsyncSimpleTest(new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null)); } - private void doSimpleTest(final Configuration conf, final RpcClientImpl client) + private void doSimpleTest(final RpcClientImpl client) throws InterruptedException, IOException { TestRpcServer rpcServer = new TestRpcServer(); List<Cell> cells = new ArrayList<Cell>(); @@ -230,8 +287,11 @@ public class TestIPC { InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - Pair<Message, CellScanner> r = client.call(null, md, param, CellUtil.createCellScanner(cells), - md.getOutputType().toProto(), User.getCurrent(), address, 0); + + PayloadCarryingRpcController pcrc = + new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); + Pair<Message, CellScanner> r = client + .call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address); int index = 0; while (r.getSecond().advance()) { assertTrue(CELL.equals(r.getSecond().current())); @@ -244,6 +304,42 @@ public class TestIPC { } } + private void doAsyncSimpleTest(final AsyncRpcClient client) + throws InterruptedException, IOException, ServiceException { + TestRpcServer rpcServer = new TestRpcServer(); + List<Cell> cells = new ArrayList<Cell>(); + int count = 3; + for (int i = 0; i < count; i++) + cells.add(CELL); + try { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + + PayloadCarryingRpcController pcrc = + new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); + + BlockingRpcChannel channel = client.createBlockingRpcChannel( + ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), + User.getCurrent(), 0); + + channel.callBlockingMethod(md, pcrc, param, md.getOutputType().toProto()); + + CellScanner cellScanner = pcrc.cellScanner(); + + int index = 0; + while (cellScanner.advance()) { + assertTrue(CELL.equals(cellScanner.current())); + index++; + } + assertEquals(count, index); + } finally { + client.close(); + rpcServer.stop(); + } + } + @Test public void testRTEDuringConnectionSetup() throws Exception { Configuration conf = HBaseConfiguration.create(); @@ -264,7 +360,48 @@ public class TestIPC { InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - client.call(null, md, param, null, null, User.getCurrent(), address, 0); + client.call(null, md, param, null, User.getCurrent(), address); + fail("Expected an exception to have been thrown!"); + } catch (Exception e) { + LOG.info("Caught expected exception: " + e.toString()); + assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); + } finally { + client.close(); + rpcServer.stop(); + } + } + + @Test + public void testRTEDuringAsyncBlockingConnectionSetup() throws Exception { + Configuration conf = HBaseConfiguration.create(); + + TestRpcServer rpcServer = new TestRpcServer(); + AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null, + new ChannelInitializer<SocketChannel>() { + + @Override protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + promise.setFailure(new RuntimeException("Injected fault")); + } + }); + } + }); + try { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + + BlockingRpcChannel channel = client.createBlockingRpcChannel( + ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), + User.getCurrent(), 0); + + channel.callBlockingMethod(md, new PayloadCarryingRpcController(), param, + md.getOutputType().toProto()); + fail("Expected an exception to have been thrown!"); } catch (Exception e) { LOG.info("Caught expected exception: " + e.toString()); @@ -275,6 +412,106 @@ public class TestIPC { } } + + @Test + public void testRTEDuringAsyncConnectionSetup() throws Exception { + Configuration conf = HBaseConfiguration.create(); + + TestRpcServer rpcServer = new TestRpcServer(); + AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null, + new ChannelInitializer<SocketChannel>() { + + @Override protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + promise.setFailure(new RuntimeException("Injected fault")); + } + }); + } + }); + try { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + + RpcChannel channel = client.createRpcChannel( + ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), + User.getCurrent(), 0); + + final AtomicBoolean done = new AtomicBoolean(false); + + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.notifyOnFail(new RpcCallback<IOException>() { + @Override + public void run(IOException e) { + done.set(true); + LOG.info("Caught expected exception: " + e.toString()); + assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); + } + }); + + channel.callMethod(md, controller, param, + md.getOutputType().toProto(), new RpcCallback<Message>() { + @Override + public void run(Message parameter) { + done.set(true); + fail("Expected an exception to have been thrown!"); + } + }); + + TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return done.get(); + } + }); + } finally { + client.close(); + rpcServer.stop(); + } + } + + @Test + public void testAsyncConnectionSetup() throws Exception { + Configuration conf = HBaseConfiguration.create(); + + TestRpcServer rpcServer = new TestRpcServer(); + AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + try { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + + RpcChannel channel = client.createRpcChannel( + ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), + User.getCurrent(), 0); + + final AtomicBoolean done = new AtomicBoolean(false); + + channel.callMethod(md, new PayloadCarryingRpcController(), param, + md.getOutputType().toProto(), new RpcCallback<Message>() { + @Override + public void run(Message parameter) { + done.set(true); + } + }); + + TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return done.get(); + } + }); + } finally { + client.close(); + rpcServer.stop(); + } + } + /** Tests that the rpc scheduler is called when requests arrive. */ @Test public void testRpcScheduler() throws IOException, InterruptedException { @@ -288,8 +525,43 @@ public class TestIPC { MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); for (int i = 0; i < 10; i++) { - client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), - md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0); + client.call( + new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))), + md, param, md.getOutputType().toProto(), User.getCurrent(), + rpcServer.getListenerAddress()); + } + verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); + } finally { + rpcServer.stop(); + verify(scheduler).stop(); + } + } + + /** + * Tests that the rpc scheduler is called when requests arrive. + */ + @Test + public void testRpcSchedulerAsync() + throws IOException, InterruptedException, ServiceException { + RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); + RpcServer rpcServer = new TestRpcServer(scheduler); + verify(scheduler).init((RpcScheduler.Context) anyObject()); + AbstractRpcClient client = new AsyncRpcClient(CONF, HConstants.CLUSTER_ID_DEFAULT, null); + try { + rpcServer.start(); + verify(scheduler).start(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + ServerName serverName = ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), + rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()); + + for (int i = 0; i < 10; i++) { + BlockingRpcChannel channel = client.createBlockingRpcChannel( + serverName, User.getCurrent(), 0); + + channel.callBlockingMethod(md, + new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))), + param, md.getOutputType().toProto()); } verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); } finally { @@ -341,9 +613,10 @@ public class TestIPC { // ReflectionUtils.printThreadInfo(new PrintWriter(System.out), // "Thread dump " + Thread.currentThread().getName()); } - CellScanner cellScanner = CellUtil.createCellScanner(cells); + PayloadCarryingRpcController pcrc = + new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); Pair<Message, CellScanner> response = - client.call(null, md, builder.build(), cellScanner, param, user, address, 0); + client.call(pcrc, md, builder.build(), param, user, address); /* int count = 0; while (p.getSecond().advance()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/854f13af/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java index b1e0b69..298f086 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -17,13 +17,13 @@ */ package org.apache.hadoop.hbase.ipc; -import static org.mockito.Mockito.mock; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.protobuf.BlockingService; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -49,13 +49,12 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.protobuf.BlockingService; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; @Category({RPCTests.class, SmallTests.class}) public class TestRpcHandlerException { @@ -178,8 +177,11 @@ public class TestRpcHandlerException { rpcServer.start(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), md - .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0); + PayloadCarryingRpcController controller = + new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))); + + client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(), + rpcServer.getListenerAddress()); } catch (Throwable e) { assert(abortable.isAborted() == true); } finally {
