Repository: hbase Updated Branches: refs/heads/branch-1 908e5a662 -> 73d677882
HBASE-15177 Reduce garbage created under high load Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/73d67788 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/73d67788 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/73d67788 Branch: refs/heads/branch-1 Commit: 73d67788206c3f60773d861375f5e6934a284418 Parents: 908e5a6 Author: Enis Soztutar <e...@apache.org> Authored: Thu Feb 4 11:07:36 2016 -0800 Committer: Enis Soztutar <e...@apache.org> Committed: Thu Feb 4 13:26:22 2016 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/ScannerCallable.java | 14 +-- .../hadoop/hbase/ipc/AsyncRpcChannel.java | 7 +- .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 20 ++--- .../hbase/ipc/PayloadCarryingRpcController.java | 7 +- .../apache/hadoop/hbase/ipc/RpcClientImpl.java | 6 +- .../hadoop/hbase/protobuf/ProtobufUtil.java | 19 +++- .../hadoop/hbase/client/TestClientScanner.java | 2 +- .../apache/hadoop/hbase/ipc/TestIPCUtil.java | 4 +- .../hadoop/hbase/io/ByteBufferInputStream.java | 14 ++- .../hadoop/hbase/util/ByteBufferUtils.java | 25 ++++++ .../org/apache/hadoop/hbase/util/Threads.java | 2 +- .../apache/hadoop/hbase/util/UnsafeAccess.java | 95 +++++++++++++++++++- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 55 +++++++----- .../AnnotationReadingPriorityFunction.java | 9 +- .../hadoop/hbase/regionserver/HRegion.java | 6 +- .../hbase/regionserver/RSRpcServices.java | 15 +++- 16 files changed, 231 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 5100314..8912e58 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -194,6 +194,13 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { if (Thread.interrupted()) { throw new InterruptedIOException(); } + + if (controller == null) { + controller = controllerFactory.newController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); + } + if (closed) { if (scannerId != -1) { close(); @@ -212,9 +219,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, this.scanMetrics != null, renew); ScanResponse response = null; - controller = controllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); try { response = getStub().scan(controller, request); // Client and RS maintain a nextCallSeq number during the scan. Every next() call @@ -374,7 +378,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { ScanRequest request = RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); try { - getStub().scan(null, request); + getStub().scan(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -391,7 +395,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { getLocation().getRegionInfo().getRegionName(), this.scan, 0, false); try { - ScanResponse response = getStub().scan(null, request); + ScanResponse response = getStub().scan(controller, request); long id = response.getScannerId(); if (logScannerActivity) { LOG.info("Open scanner=" + id + " for scan=" + scan.toString() http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 44e8322..9fe2cf6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -421,7 +421,7 @@ public class AsyncRpcChannel { requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); } // Only pass priority if there one. Let zero be same as no priority. - if (call.controller.getPriority() != 0) { + if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { requestHeaderBuilder.setPriority(call.controller.getPriority()); } @@ -669,6 +669,7 @@ public class AsyncRpcChannel { private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, final UserGroupInformation user) throws IOException, InterruptedException { user.doAs(new PrivilegedExceptionAction<Void>() { + @Override public Void run() throws IOException, InterruptedException { if (shouldAuthenticateOverKrb()) { if (currRetries < MAX_SASL_RETRIES) { @@ -711,12 +712,12 @@ public class AsyncRpcChannel { public int getConnectionHashCode() { return ConnectionId.hashCode(ticket, serviceName, address); } - + @Override public int hashCode() { return getConnectionHashCode(); } - + @Override public boolean equals(Object obj) { if (obj instanceof AsyncRpcChannel) { http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 734227c..22c5cc1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.IOException; import java.io.InputStream; @@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; @@ -180,19 +180,18 @@ public class IPCUtil { public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, final byte [] cellBlock) throws IOException { - return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length); + return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock)); } /** * @param codec - * @param cellBlock - * @param offset - * @param length + * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be + * position()'ed at the start of the cell block and limit()'ed at the end. * @return CellScanner to work against the content of <code>cellBlock</code> * @throws IOException */ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final byte [] cellBlock, final int offset, final int length) + final ByteBuffer cellBlock) throws IOException { // If compressed, decompress it first before passing it on else we will leak compression // resources if the stream is not closed properly after we let it out. @@ -202,18 +201,17 @@ public class IPCUtil { if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); CompressionInputStream cis = - compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length), - poolDecompressor); + compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor); ByteBufferOutputStream bbos = null; try { // TODO: This is ugly. The buffer will be resized on us if we guess wrong. // TODO: Reuse buffers. - bbos = new ByteBufferOutputStream((length - offset) * + bbos = new ByteBufferOutputStream(cellBlock.remaining() * this.cellBlockDecompressionMultiplier); IOUtils.copy(cis, bbos); bbos.close(); ByteBuffer bb = bbos.getByteBuffer(); - is = new ByteArrayInputStream(bb.array(), 0, bb.limit()); + is = new ByteBufferInputStream(bb); } finally { if (is != null) is.close(); if (bbos != null) bbos.close(); @@ -221,7 +219,7 @@ public class IPCUtil { CodecPool.returnDecompressor(poolDecompressor); } } else { - is = new ByteArrayInputStream(cellBlock, offset, length); + is = new ByteBufferInputStream(cellBlock); } return codec.getDecoder(is); } http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java index 70f30f9..82634e5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java @@ -35,14 +35,14 @@ import org.apache.hadoop.hbase.TableName; @InterfaceAudience.Private public class PayloadCarryingRpcController extends TimeLimitedRpcController implements CellScannable { + + public static final int PRIORITY_UNSET = -1; /** * Priority to set on this request. Set it here in controller so available composing the * request. This is the ordained way of setting priorities going forward. We will be * undoing the old annotation-based mechanism. */ - // Currently only multi call makes use of this. Eventually this should be only way to set - // priority. - private int priority = HConstants.NORMAL_QOS; + private int priority = PRIORITY_UNSET; /** * They are optionally set on construction, cleared after we make the call, and then optionally @@ -67,6 +67,7 @@ public class PayloadCarryingRpcController /** * @return One-shot cell scanner (you cannot back it up and restart) */ + @Override public CellScanner cellScanner() { return cellScanner; } http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 82ff5a9..e1821e5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -899,8 +899,10 @@ public class RpcClientImpl extends AbstractRpcClient { cellBlockBuilder.setLength(cellBlock.limit()); builder.setCellBlockMeta(cellBlockBuilder.build()); } - // Only pass priority if there one. Let zero be same as no priority. - if (priority != 0) builder.setPriority(priority); + // Only pass priority if there is one set. + if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) { + builder.setPriority(priority); + } RequestHeader header = builder.build(); setupIOstreams(); http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index c5c8b88..bdca5c1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -2479,13 +2479,13 @@ public final class ProtobufUtil { */ public static String getRegionEncodedName( final RegionSpecifier regionSpecifier) throws DoNotRetryIOException { - byte[] value = regionSpecifier.getValue().toByteArray(); + ByteString value = regionSpecifier.getValue(); RegionSpecifierType type = regionSpecifier.getType(); switch (type) { case REGION_NAME: - return HRegionInfo.encodeRegionName(value); + return HRegionInfo.encodeRegionName(value.toByteArray()); case ENCODED_REGION_NAME: - return Bytes.toString(value); + return value.toStringUtf8(); default: throw new DoNotRetryIOException( "Unsupported region specifier type: " + type); @@ -3211,6 +3211,19 @@ public final class ProtobufUtil { codedInput.checkLastTagWas(0); } + public static void mergeFrom(Message.Builder builder, CodedInputStream codedInput, int length) + throws IOException { + codedInput.resetSizeCounter(); + int prevLimit = codedInput.setSizeLimit(length); + + int limit = codedInput.pushLimit(length); + builder.mergeFrom(codedInput); + codedInput.popLimit(limit); + + codedInput.checkLastTagWas(0); + codedInput.setSizeLimit(prevLimit); + } + public static ReplicationLoadSink toReplicationLoadSink( ClusterStatusProtos.ReplicationLoadSink cls) { return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), cls.getTimeStampsOfLastAppliedOp()); http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 44a742f..a6c8685 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -513,7 +513,7 @@ public class TestClientScanner { anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null)); try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"), - clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + clusterConn, rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) { Iterator<Result> iter = scanner.iterator(); while (iter.hasNext()) { iter.next(); http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index 5b30482..bb580c8 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -57,7 +57,7 @@ public class TestIPCUtil { public void before() { this.util = new IPCUtil(new Configuration()); } - + @Test public void testBuildCellBlock() throws IOException { doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null); @@ -78,7 +78,7 @@ public class TestIPCUtil { CellScanner cellScanner = sized? getSizedCellScanner(cells): CellUtil.createCellScanner(Arrays.asList(cells).iterator()); ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner); - cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit()); + cellScanner = util.createCellScanner(codec, compressor, bb); int i = 0; while (cellScanner.advance()) { i++; http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java index 1530ccd..8aee07b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java @@ -21,6 +21,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; /** * Not thread safe! @@ -33,7 +34,7 @@ public class ByteBufferInputStream extends InputStream { private ByteBuffer buf; public ByteBufferInputStream(ByteBuffer buf) { - this.buf = buf; + this.buf = buf; } /** @@ -42,6 +43,7 @@ public class ByteBufferInputStream extends InputStream { * because the end of the stream has been reached, the value <code>-1</code> is returned. * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached. */ + @Override public int read() { if (this.buf.hasRemaining()) { return (this.buf.get() & 0xff); @@ -58,7 +60,8 @@ public class ByteBufferInputStream extends InputStream { * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even * 1 byte can be read because the end of the stream has been reached. */ - public int read(byte b[], int off, int len) { + @Override + public int read(byte[] b, int off, int len) { int avail = available(); if (avail <= 0) { return -1; @@ -71,7 +74,8 @@ public class ByteBufferInputStream extends InputStream { return 0; } - this.buf.get(b, off, len); + ByteBufferUtils.copyFromBufferToArray(b, this.buf, this.buf.position(), off, len); + this.buf.position(this.buf.position() + len); // we should advance the buffer position return len; } @@ -82,6 +86,7 @@ public class ByteBufferInputStream extends InputStream { * @param n the number of bytes to be skipped. * @return the actual number of bytes skipped. */ + @Override public long skip(long n) { long k = Math.min(n, available()); if (k < 0) { @@ -95,7 +100,8 @@ public class ByteBufferInputStream extends InputStream { * @return the number of remaining bytes that can be read (or skipped * over) from this input stream. */ + @Override public int available() { return this.buf.remaining(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index ef3d368..d822248 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -41,6 +41,8 @@ public final class ByteBufferUtils { private final static int VALUE_MASK = 0x7f; private final static int NEXT_BIT_SHIFT = 7; private final static int NEXT_BIT_MASK = 1 << 7; + private static final boolean UNSAFE_AVAIL = UnsafeAccess.isAvailable(); + private static final boolean UNSAFE_UNALIGNED = UnsafeAccess.unaligned(); private ByteBufferUtils() { } @@ -509,4 +511,27 @@ public final class ByteBufferUtils { } return len1 - len2; } + + /** + * Copies specified number of bytes from given offset of 'in' ByteBuffer to + * the array. + * @param out + * @param in + * @param sourceOffset + * @param destinationOffset + * @param length + */ + public static void copyFromBufferToArray(byte[] out, ByteBuffer in, int sourceOffset, + int destinationOffset, int length) { + if (in.hasArray()) { + System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out, destinationOffset, length); + } else if (UNSAFE_AVAIL) { + UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length); + } else { + int oldPos = in.position(); + in.position(sourceOffset); + in.get(out, destinationOffset, length); + in.position(oldPos); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index ef8fe43..5c2bc12 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -43,7 +43,7 @@ public class Threads { private static final Log LOG = LogFactory.getLog(Threads.class); private static final AtomicInteger poolNumber = new AtomicInteger(1); - private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = + public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java index 1a0b0e9..47a7f58 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.security.AccessController; import java.security.PrivilegedAction; @@ -29,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; @InterfaceAudience.Private @InterfaceStability.Evolving @@ -39,11 +41,15 @@ public final class UnsafeAccess { private static final Log LOG = LogFactory.getLog(UnsafeAccess.class); public static final Unsafe theUnsafe; + private static boolean unaligned = false; /** The offset to the first element in a byte array. */ public static final int BYTE_ARRAY_BASE_OFFSET; - private static boolean unaligned = false; + // This number limits the number of bytes to copy per call to Unsafe's + // copyMemory method. A limit is imposed to allow for safepoint polling + // during a large copy + static final long UNSAFE_COPY_THRESHOLD = 1024L * 1024L; static { theUnsafe = (Unsafe) AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override @@ -89,6 +95,89 @@ public final class UnsafeAccess { return unaligned; } - public static final boolean littleEndian = ByteOrder.nativeOrder() - .equals(ByteOrder.LITTLE_ENDIAN); + + // APIs to copy data. This will be direct memory location copy and will be much faster + /** + * Copies the bytes from given array's offset to length part into the given buffer. + * @param src + * @param srcOffset + * @param dest + * @param destOffset + * @param length + */ + public static void copy(byte[] src, int srcOffset, ByteBuffer dest, int destOffset, int length) { + long destAddress = destOffset; + Object destBase = null; + if (dest.isDirect()) { + destAddress = destAddress + ((DirectBuffer) dest).address(); + } else { + destAddress = destAddress + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset(); + destBase = dest.array(); + } + long srcAddress = srcOffset + BYTE_ARRAY_BASE_OFFSET; + unsafeCopy(src, srcAddress, destBase, destAddress, length); + } + + private static void unsafeCopy(Object src, long srcAddr, Object dst, long destAddr, long len) { + while (len > 0) { + long size = (len > UNSAFE_COPY_THRESHOLD) ? UNSAFE_COPY_THRESHOLD : len; + theUnsafe.copyMemory(src, srcAddr, dst, destAddr, len); + len -= size; + srcAddr += size; + destAddr += size; + } + } + + /** + * Copies specified number of bytes from given offset of {@code src} ByteBuffer to the + * {@code dest} array. + * + * @param src + * @param srcOffset + * @param dest + * @param destOffset + * @param length + */ + public static void copy(ByteBuffer src, int srcOffset, byte[] dest, int destOffset, + int length) { + long srcAddress = srcOffset; + Object srcBase = null; + if (src.isDirect()) { + srcAddress = srcAddress + ((DirectBuffer) src).address(); + } else { + srcAddress = srcAddress + BYTE_ARRAY_BASE_OFFSET + src.arrayOffset(); + srcBase = src.array(); + } + long destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET; + unsafeCopy(srcBase, srcAddress, dest, destAddress, length); + } + + /** + * Copies specified number of bytes from given offset of {@code src} buffer into the {@code dest} + * buffer. + * + * @param src + * @param srcOffset + * @param dest + * @param destOffset + * @param length + */ + public static void copy(ByteBuffer src, int srcOffset, ByteBuffer dest, int destOffset, + int length) { + long srcAddress, destAddress; + Object srcBase = null, destBase = null; + if (src.isDirect()) { + srcAddress = srcOffset + ((DirectBuffer) src).address(); + } else { + srcAddress = srcOffset + src.arrayOffset() + BYTE_ARRAY_BASE_OFFSET; + srcBase = src.array(); + } + if (dest.isDirect()) { + destAddress = destOffset + ((DirectBuffer) dest).address(); + } else { + destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset(); + destBase = dest.array(); + } + unsafeCopy(srcBase, srcAddress, destBase, destAddress, length); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 2859ea0..6bf623d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -85,6 +85,8 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; @@ -112,6 +114,7 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; @@ -485,10 +488,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return this.size; } + @Override public long getResponseCellSize() { return responseCellSize; } + @Override public void incrementResponseCellSize(long cellSize) { responseCellSize += cellSize; } @@ -571,7 +576,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { readPool = Executors.newFixedThreadPool(readThreads, new ThreadFactoryBuilder().setNameFormat( "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() + - ",port=" + port).setDaemon(true).build()); + ",port=" + port).setDaemon(true) + .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); for (int i = 0; i < readThreads; ++i) { Reader reader = new Reader(); readers[i] = reader; @@ -848,7 +854,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { throw ieo; } catch (Exception e) { if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage()); + LOG.debug(getName() + ": Caught exception while reading:", e); } count = -1; //so that the (count < 0) block is executed } @@ -894,6 +900,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { Responder() throws IOException { this.setName("RpcServer.responder"); this.setDaemon(true); + this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER); writeSelector = Selector.open(); // create a selector } @@ -1311,17 +1318,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return authorizedUgi; } - private void saslReadAndProcess(byte[] saslToken) throws IOException, + private void saslReadAndProcess(ByteBuffer saslToken) throws IOException, InterruptedException { if (saslContextEstablished) { if (LOG.isTraceEnabled()) - LOG.trace("Have read input token of size " + saslToken.length + LOG.trace("Have read input token of size " + saslToken.limit() + " for processing by saslServer.unwrap()"); if (!useWrap) { processOneRpc(saslToken); } else { - byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length); + byte[] b = saslToken.array(); + byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit()); processUnwrappedData(plaintextData); } } else { @@ -1370,10 +1378,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } if (LOG.isDebugEnabled()) { - LOG.debug("Have read input token of size " + saslToken.length + LOG.debug("Have read input token of size " + saslToken.limit() + " for processing by saslServer.evaluateResponse()"); } - replyToken = saslServer.evaluateResponse(saslToken); + replyToken = saslServer.evaluateResponse(saslToken.array()); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1569,6 +1577,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { throw new IllegalArgumentException("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } + + // TODO: check dataLength against some limit so that the client cannot OOM the server data = ByteBuffer.allocate(dataLength); // Increment the rpc count. This counter will be decreased when we write @@ -1598,9 +1608,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } if (useSasl) { - saslReadAndProcess(data.array()); + saslReadAndProcess(data); } else { - processOneRpc(data.array()); + processOneRpc(data); } } finally { @@ -1629,8 +1639,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } // Reads the connection header following version - private void processConnectionHeader(byte[] buf) throws IOException { - this.connectionHeader = ConnectionHeader.parseFrom(buf); + private void processConnectionHeader(ByteBuffer buf) throws IOException { + this.connectionHeader = ConnectionHeader.parseFrom( + new ByteBufferInputStream(buf)); String serviceName = connectionHeader.getServiceName(); if (serviceName == null) throw new EmptyServiceNameException(); this.service = getService(services, serviceName); @@ -1744,13 +1755,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (unwrappedData.remaining() == 0) { unwrappedDataLengthBuffer.clear(); unwrappedData.flip(); - processOneRpc(unwrappedData.array()); + processOneRpc(unwrappedData); unwrappedData = null; } } } - private void processOneRpc(byte[] buf) throws IOException, InterruptedException { + + private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); } else { @@ -1772,16 +1784,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * @throws IOException * @throws InterruptedException */ - protected void processRequest(byte[] buf) throws IOException, InterruptedException { - long totalRequestSize = buf.length; + protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + long totalRequestSize = buf.limit(); int offset = 0; // Here we read in the header. We avoid having pb // do its default 4k allocation for CodedInputStream. We force it to use backing array. - CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length); + CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit()); int headerSize = cis.readRawVarint32(); offset = cis.getTotalBytesRead(); Message.Builder builder = RequestHeader.newBuilder(); - ProtobufUtil.mergeFrom(builder, buf, offset, headerSize); + ProtobufUtil.mergeFrom(builder, cis, headerSize); RequestHeader header = (RequestHeader) builder.build(); offset += headerSize; int id = header.getCallId(); @@ -1812,19 +1824,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); if (md == null) throw new UnsupportedOperationException(header.getMethodName()); builder = this.service.getRequestPrototype(md).newBuilderForType(); - // To read the varint, I need an inputstream; might as well be a CIS. - cis = CodedInputStream.newInstance(buf, offset, buf.length); + cis.resetSizeCounter(); int paramSize = cis.readRawVarint32(); offset += cis.getTotalBytesRead(); if (builder != null) { - ProtobufUtil.mergeFrom(builder, buf, offset, paramSize); + ProtobufUtil.mergeFrom(builder, cis, paramSize); param = builder.build(); } offset += paramSize; } if (header.hasCellBlockMeta()) { - cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, - buf, offset, buf.length); + buf.position(offset); + cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf); } } catch (Throwable t) { InetSocketAddress address = getListenerAddress(); http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index cfdbce0..8438378 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoReque import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; @@ -217,10 +216,10 @@ public class AnnotationReadingPriorityFunction implements PriorityFunction { if (param == null) { return HConstants.NORMAL_QOS; } - if (param instanceof MultiRequest) { - // The multi call has its priority set in the header. All calls should work this way but - // only this one has been converted so far. No priority == NORMAL_QOS. - return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS; + + // Trust the client-set priorities if set + if (header.hasPriority()) { + return header.getPriority(); } String cls = param.getClass().getName(); http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3b3f030..7605fd0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5171,6 +5171,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param readLock is the lock reader or writer. True indicates that a non-exlcusive * lock is requested */ + @Override public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { // Make sure the row is inside of this region before getting the lock for it. checkRow(row, "row lock"); @@ -5578,8 +5579,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). - List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); - List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>(); + List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size()); + List<KeyValueScanner> joinedScanners + = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size()); if (additionalScanners != null) { scanners.addAll(additionalScanners); } http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index f136071..f95446f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1050,8 +1050,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ Region getRegion( final RegionSpecifier regionSpecifier) throws IOException { - return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(), - ProtobufUtil.getRegionEncodedName(regionSpecifier)); + ByteString value = regionSpecifier.getValue(); + RegionSpecifierType type = regionSpecifier.getType(); + switch (type) { + case REGION_NAME: + byte[] regionName = value.toByteArray(); + String encodedRegionName = HRegionInfo.encodeRegionName(regionName); + return regionServer.getRegionByEncodedName(regionName, encodedRegionName); + case ENCODED_REGION_NAME: + return regionServer.getRegionByEncodedName(value.toStringUtf8()); + default: + throw new DoNotRetryIOException( + "Unsupported region specifier type: " + type); + } } @VisibleForTesting